Hey guys, As far as I know, the timestamp field of StreamRecord instance is the event time assgined by assignTimestampsAndWatermarks method if I have set the time characteristic of job to event time. My confusion is that the timestamp does not transfer through different operators as I expect. E.g., Map operator implemented by StreamMap class: @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } Flat Map operator by StreamFlatMap: @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); } Probably, Agg operator by StreamGroupedReduce: @Override public void processElement(StreamRecord<IN> element) throws Exception { IN value = element.getValue(); IN currentValue = values.value();
if (currentValue != null) { IN reduced = userFunction.reduce(currentValue, value); values.update(reduced); output.collect(element.replace(reduced)); } else { values.update(value); output.collect(element.replace(value)); } } Also, window operator by WindowOperator: private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); } All the operators above will deliver the timestamp to new StreamRecord instance. Then I just write a very simple SQL query, i.e., select a + 10, b, c from tb, however, when I get the result stream by toAppendStream or toRetractStream method, I find the timestamp of StreamRecord is null which is printed by Context.timestamp() in a ProcessFunction. Best regard, Dongyang Yao