Hello people!

I have a DataStream, which has events with with a continuing number which signifies their belonging to a production cycle. In essence, this is what the data looks like:

value, production cycle
12.0, 2000
12.3, 2000 one production cylce
12.2, 2000

0.0, 2001
0.4, 2002 another production cycle
1.1, 2002

55.0, 2003
60.0, 2003 another production cycle
70.0, 2003

I have to do some calculations over the events of each production cycle. I want to use Flink's window API for that. This is how I'm doing it right now:

DataStream<String> test = streamExecEnv.readTextFile("C:/Projects/Python/testdata.txt")
    .map(newImaginePaperDataConverterTest()) // convert data to POJO
.assignTimestampsAndWatermarks(newImaginePaperAssigner()) // Assign timestamps for event time .keyBy((ImaginePaperData event) ->event.lunum) //<- the production cycle number
    .window(GlobalWindows.create()) // create global window
.trigger(newLunumTrigger()) // "split" the window with a custom trigger .process(newImaginePaperWindowReportFunction()); // apply a function over the aggregated events

I'm getting a "DataStream" out of a text file, just for testing purposes. The problem is that what I'm doing only aggregates one single event for a production cycle. Why is that? I thought keying the stream by the production cycle number already partitions the stream anyways. The trigger says when the production cycle number is changed, a new global window is started and the events of the current window are aggregated. What am I missing here?
Just to be safe, here is my implementation of the custom trigger:

publicclassLunumTriggerextendsTrigger<ImaginePaperData, GlobalWindow> {

privatestaticfinallongserialVersionUID = 1L;

publicLunumTrigger() {}

privatefinalValueStateDescriptor<Integer> prevLunum = newValueStateDescriptor<>("lunum", Integer.class);

@Override
publicTriggerResultonElement(ImaginePaperDataelement, longtimestamp, GlobalWindowwindow, TriggerContextctx) throwsException {

ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);

if (lunumState.value() == null || !(element.lunum.equals(lunumState.value()))) { System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> FIRE!");
lunumState.update(element.lunum);
returnTriggerResult.FIRE_AND_PURGE;
}

System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: " + element.lunum + " ==> HOLD!");
lunumState.update(element.lunum);
returnTriggerResult.CONTINUE;
}

@Override
publicTriggerResultonProcessingTime(longtime, GlobalWindowwindow, TriggerContextctx) throwsException {
returnTriggerResult.CONTINUE;
}

@Override
publicTriggerResultonEventTime(longtime, GlobalWindowwindow, TriggerContextctx) throwsException {
returnTriggerResult.CONTINUE;
}

@Override
publicvoidclear(GlobalWindowwindow, TriggerContextctx) throwsException {
ctx.getPartitionedState(prevLunum).clear();
}
}

I'm very grateful for your help.

Regards,

Daniel

Reply via email to