Hey guys,
As far as I know, the timestamp field of StreamRecord instance is the event 
time assgined by assignTimestampsAndWatermarks method if I have set the time 
characteristic of job to event time. My confusion is that the timestamp does 
not transfer through different operators as I expect. 
E.g., Map operator implemented by StreamMap class:
@Override public void processElement(StreamRecord<IN> element) throws Exception 
{ output.collect(element.replace(userFunction.map(element.getValue()))); }
Flat Map operator by StreamFlatMap:
@Override public void processElement(StreamRecord<IN> element) throws Exception 
{ collector.setTimestamp(element); userFunction.flatMap(element.getValue(), 
collector); }
Probably, Agg operator by StreamGroupedReduce:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();


if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
values.update(value);
output.collect(element.replace(value));
}
}
Also, window operator by WindowOperator:
private void emitWindowContents(W window, ACC contents) throws Exception { 
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); 
processContext.window = window; userFunction.process(triggerContext.key, 
window, processContext, contents, timestampedCollector); }
All the operators above will deliver the timestamp to new StreamRecord 
instance. Then I just write a very simple SQL query, i.e., select a + 10, b, c 
from tb, however, when I get the result stream by toAppendStream or 
toRetractStream method, I find the timestamp of StreamRecord is null which is 
printed by Context.timestamp() in a ProcessFunction.


Best regard, 
Dongyang Yao

Reply via email to