[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:31 AM: {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction() { private transient MapState processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction.Context context, Collector out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction() { @Override public void processElement(Domain value, ProcessFunction.Context ctx, Collector out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List getDataCollection() { List data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2"
[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:29 AM: {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction() { private transient MapState processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction.Context context, Collector out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction() { @Override public void processElement(Domain value, ProcessFunction.Context ctx, Collector out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List getDataCollection() { List data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2"))
[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:28 AM: {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction() { private transient MapState processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction.Context context, Collector out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction() { @Override public void processElement(Domain value, ProcessFunction.Context ctx, Collector out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List getDataCollection() { List data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2"));
[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:27 AM: {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction() { private transient MapState processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction.Context context, Collector out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction() { @Override public void processElement(Domain value, ProcessFunction.Context ctx, Collector out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List getDataCollection() { List data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2"));
[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:22 AM: {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction() { private transient MapState processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction.Context context, Collector out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction() { @Override public void processElement(Domain value, ProcessFunction.Context ctx, Collector out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List getDataCollection() { List data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2"));
[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/15/24 4:21 PM: When running in Batch Mode If we do: ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); and @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { out.collect(111); } This is a new record being emitted and has no existing timestamp. Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on the collected element seems to be timestamp when the timer was registered to be fired, which is Long.MAX_VALUE was (Author: JIRAUSER302517): When running in Batch Mode If we do: ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); and @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { out.collect(111); } Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on the collected element seems to be timestamp when the timer was registered to be fired, which is Long.MAX_VALUE > Incorrect timestamp of stream elements collected from onTimer in batch mode > --- > > Key: FLINK-35289 > URL: https://issues.apache.org/jira/browse/FLINK-35289 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.18.1 >Reporter: Kanthi Vaidya >Priority: Major > > In batch mode all registered timers will fire at the _end of time. Given > this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned > to the elements that are collected from the onTimer context ends up being > Long.MAX_VALUE. Ideally this should be the time when the batch actually > executed the onTimer function._ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode
[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700 ] Kanthi Vaidya edited comment on FLINK-35289 at 5/15/24 4:16 PM: When running in Batch Mode If we do: ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); and @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { out.collect(111); } Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on the collected element seems to be timestamp when the timer was registered to be fired, which is Long.MAX_VALUE was (Author: JIRAUSER302517): If we do: ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); and @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception \{ out.collect(111); } Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on the collected element seems to be timestamp when the timer was registered to be fired, which is Long.MAX_VALUE > Incorrect timestamp of stream elements collected from onTimer in batch mode > --- > > Key: FLINK-35289 > URL: https://issues.apache.org/jira/browse/FLINK-35289 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.18.1 >Reporter: Kanthi Vaidya >Priority: Major > > In batch mode all registered timers will fire at the _end of time. Given > this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned > to the elements that are collected from the onTimer context ends up being > Long.MAX_VALUE. Ideally this should be the time when the batch actually > executed the onTimer function._ -- This message was sent by Atlassian Jira (v8.20.10#820010)