stream<rstring IIN, rstring STARTDATE, rstring CHANNEL, rstring PHONE_NUMBER, rstring DEVICE_TOKEN, rstring DEVICE_OS, rstring MESSAGE> cond = Custom(selectStreamsOutput; interactResponse) {
logic
state: {
timestamp ts = getTimestamp();
mutable rstring msgType = "";
mutable boolean existsSO = false;
mutable boolean statement = false ;
}
onTuple selectStreamsOutput: {
println(TOTAL);
if ((int32)TOTAL >= (int32)1) {
existsSO = true;
} else {
existsSO = false;
}
}
onTuple interactResponse: {
println("Exists row(interact): "+(rstring)existsSO);
if (!existsSO) {
if(row.DEVICE_TOKEN == "" || row.DEVICE_OS == "" && row.PHONE_NUMBER != "") {
msgType = "SMS_SEND";
} else if (row.DEVICE_TOKEN != "" && row.DEVICE_OS != "" && row.PHONE_NUMBER != "") {
msgType = "PUSH_SEND";
}
submit({
IIN = row.IIN,
STARTDATE = getLocalString(ts, "%Y-%m-%dT%H:%M:%S.") + (rstring)(getNanoseconds(ts) / 1000u),
CHANNEL = msgType,
PHONE_NUMBER = row.PHONE_NUMBER,
DEVICE_TOKEN = row.DEVICE_TOKEN,
DEVICE_OS = row.DEVICE_OS,
MESSAGE = row.TITLE+" "+row.FIRSTNAME+"! "+row.MESSAGE
}, cond) ;
}
}
}