[ https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786 ]
Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:31 AM: ---------------------------------------------------------------- {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map<String, String> properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream<Domain> positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction<String, Domain, Domain>() { private transient MapState<String, Domain> processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor<String, Domain> mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction<String, Domain, Domain>.Context context, Collector<Domain> out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Domain> collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction<Domain, Void>() { @Override public void processElement(Domain value, ProcessFunction<Domain, Void>.Context ctx, Collector<Void> out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream<Domain> domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List<Domain> getDataCollection() { List<Domain> data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2")); data.add(new Domain("A21", "673-9Z-09")); data.add(new Domain("A21", "843-09-21")); return data; } private static WatermarkStrategy<Domain> getNoWatermarkStrategy() { return WatermarkStrategy.<Domain>forGenerator((ctx) -> new NoWatermarksGenerator<>()) .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>()); } private static WatermarkStrategy<Domain> getMonotonous() { return WatermarkStrategy.<Domain>forMonotonousTimestamps() .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>()); } private static class Domain { private String a1; private String uniqueId; public Domain() { } public Domain(String a1, String uniqueId) { this.a1 = a1; this.uniqueId = uniqueId; } public String getA1() { return a1; } public void setA1(String a1) { this.a1 = a1; } public String getUniqueId() { return uniqueId; } } } {code} Look at attached program which reproduces issue was (Author: JIRAUSER302517): {code:java} package sample; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.IngestionTimeAssigner; import org.apache.flink.api.common.eventtime.NoWatermarksGenerator; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.jmx.JMXReporterFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;@Slf4j public class BatchJobTest2 { private static ParameterTool setupParams() { Map<String, String> properties = new HashMap<>(); properties.put("security.delegation.token.provider.hadoopfs.enabled", "false"); properties.put("security.delegation.token.provider.hbase.enabled", "false"); return ParameterTool.fromMap(properties); } public static void main(String[] args) throws Exception { ParameterTool paramUtils = setupParams(); Configuration config = new Configuration(paramUtils.getConfiguration()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, JMXReporterFactory.class.getName()); config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue())); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream<Domain> positionData = domainStream(env); positionData.keyBy(Domain::getA1) .process(new KeyedProcessFunction<String, Domain, Domain>() { private transient MapState<String, Domain> processedInputs; @Override public void open(Configuration configuration) { MapStateDescriptor<String, Domain> mapStateDescriptor = new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class), TypeInformation.of(Domain.class)); processedInputs = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void processElement(Domain value, KeyedProcessFunction<String, Domain, Domain>.Context context, Collector<Domain> out) throws Exception { processedInputs.put(value.getUniqueId(), value); context.timerService().registerEventTimeTimer(Long.MAX_VALUE); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Domain> collector) throws Exception { processedInputs.iterator().forEachRemaining(entry -> collector.collect(entry.getValue())); processedInputs.clear(); } }).process(new ProcessFunction<Domain, Void>() { @Override public void processElement(Domain value, ProcessFunction<Domain, Void>.Context ctx, Collector<Void> out) throws Exception { log.info("Timestamp : {}, element : {}", ctx.timestamp(), value.getUniqueId()); } }); env.execute("FileReadJob"); } public static DataStream<Domain> domainStream(StreamExecutionEnvironment env) { /* Not assigning watermarks as program is being run in batch mode and watermarks are irrelevant to batch mode */ return env.fromCollection(getDataCollection()) .assignTimestampsAndWatermarks(getNoWatermarkStrategy()) .returns(TypeInformation.of(Domain.class)) .name("test-domain-source") .uid("test-domain-source"); } private static List<Domain> getDataCollection() { List<Domain> data = new ArrayList<>(); data.add(new Domain("A11", "123-Z-1")); data.add(new Domain("A11", "456-A-2")); data.add(new Domain("A11", "456-B-2")); data.add(new Domain("A21", "673-9Z-09")); data.add(new Domain("A21", "843-09-21")); return data; } private static WatermarkStrategy<Domain> getNoWatermarkStrategy() { return WatermarkStrategy.<Domain>forGenerator((ctx) -> new NoWatermarksGenerator<>()) .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>()); } private static WatermarkStrategy<Domain> getMonotonous() { return WatermarkStrategy.<Domain>forMonotonousTimestamps() .withTimestampAssigner((ctx) -> new IngestionTimeAssigner<>()); } private static class Domain { private String a1; private String uniqueId; public Domain() { } public Domain(String a1, String uniqueId) { this.a1 = a1; this.uniqueId = uniqueId; } public String getA1() { return a1; } public void setA1(String a1) { this.a1 = a1; } public String getUniqueId() { return uniqueId; } } } {code} Look at attached program which reproduces issue > Incorrect timestamp of stream elements collected from onTimer in batch mode > --------------------------------------------------------------------------- > > Key: FLINK-35289 > URL: https://issues.apache.org/jira/browse/FLINK-35289 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.18.1 > Reporter: Kanthi Vaidya > Priority: Major > > In batch mode all registered timers will fire at the _end of time. Given > this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned > to the elements that are collected from the onTimer context ends up being > Long.MAX_VALUE. Ideally this should be the time when the batch actually > executed the onTimer function._ -- This message was sent by Atlassian Jira (v8.20.10#820010)