[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:31 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    

@Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    

}    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:29 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    

}    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"))

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:28 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:27 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    

public static DataStream domainStream(StreamExecutionEnvironment env) { 
       /* Not assigning watermarks as program is being run in batch mode and 
watermarks are irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    
}    

private static List getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846786#comment-17846786
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/16/24 2:22 AM:


{code:java}
package sample;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.IngestionTimeAssigner;
import org.apache.flink.api.common.eventtime.NoWatermarksGenerator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class BatchJobTest2 {    private static ParameterTool setupParams() {
        Map properties = new HashMap<>();
        properties.put("security.delegation.token.provider.hadoopfs.enabled", 
"false");
        properties.put("security.delegation.token.provider.hbase.enabled", 
"false");
        return ParameterTool.fromMap(properties);
    }    public static void main(String[] args) throws Exception {
        ParameterTool paramUtils = setupParams();
        Configuration config = new Configuration(paramUtils.getConfiguration());
        config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx." + 
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
JMXReporterFactory.class.getName());
        config.setLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, 
paramUtils.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.key(), 
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue()));
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);        
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream positionData = domainStream(env);
        positionData.keyBy(Domain::getA1)
                .process(new KeyedProcessFunction() {
                    private transient MapState processedInputs; 
                   @Override
                    public void open(Configuration configuration) {
                        MapStateDescriptor mapStateDescriptor = 
new MapStateDescriptor<>("domain2-input", TypeInformation.of(String.class),
                                TypeInformation.of(Domain.class));
                        processedInputs = 
getRuntimeContext().getMapState(mapStateDescriptor);
                    }                    @Override
                    public void processElement(Domain value, 
KeyedProcessFunction.Context context, Collector 
out) throws Exception {
                        processedInputs.put(value.getUniqueId(), value);
                        
context.timerService().registerEventTimeTimer(Long.MAX_VALUE);                  
  }                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector collector) throws Exception {
                        processedInputs.iterator().forEachRemaining(entry -> 
collector.collect(entry.getValue()));
                        processedInputs.clear();
                    }
                }).process(new ProcessFunction() {
                    @Override
                    public void processElement(Domain value, 
ProcessFunction.Context ctx, Collector out) throws 
Exception {
                        log.info("Timestamp : {}, element : {}", 
ctx.timestamp(), value.getUniqueId());
                    }
                });        env.execute("FileReadJob");    }    public static 
DataStream domainStream(StreamExecutionEnvironment env) {        /* Not 
assigning watermarks as program is being run in batch mode and watermarks are 
irrelevant to batch mode */
        return env.fromCollection(getDataCollection())
                .assignTimestampsAndWatermarks(getNoWatermarkStrategy())
                .returns(TypeInformation.of(Domain.class))
                .name("test-domain-source")
                .uid("test-domain-source");    }    private static List 
getDataCollection() {
        List data = new ArrayList<>();
        data.add(new Domain("A11", "123-Z-1"));
        data.add(new Domain("A11", "456-A-2"));
        data.add(new Domain("A11", "456-B-2"));
   

[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/15/24 4:21 PM:


When running in Batch Mode If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception

{ out.collect(111); }

This is a new record being emitted and has no existing timestamp.

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE


was (Author: JIRAUSER302517):
When running in Batch Mode If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception {
out.collect(111);
}

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35289) Incorrect timestamp of stream elements collected from onTimer in batch mode

2024-05-15 Thread Kanthi Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846700#comment-17846700
 ] 

Kanthi Vaidya edited comment on FLINK-35289 at 5/15/24 4:16 PM:


When running in Batch Mode If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE);

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception {
out.collect(111);
}

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE


was (Author: JIRAUSER302517):
If we do:
ctx.timerService().registerEventTimeTimer(Long.MAX_VALUE); 

and
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) 
throws Exception \{
out.collect(111);
} 

Then collected value has timestamp Long.MAX_VALUE. The timestamp registered on 
the collected element seems to be timestamp when the timer was registered to be 
fired, which is Long.MAX_VALUE

> Incorrect timestamp of stream elements collected from onTimer in batch mode
> ---
>
> Key: FLINK-35289
> URL: https://issues.apache.org/jira/browse/FLINK-35289
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.18.1
>Reporter: Kanthi Vaidya
>Priority: Major
>
> In batch mode  all registered timers will fire at the _end of time. Given 
> this, if a user registers a timer for Long.MAX_VALUE, the timestamp assigned 
> to the elements that are collected from the onTimer context ends up being 
> Long.MAX_VALUE. Ideally this should be the time when the batch actually 
> executed  the onTimer function._



--
This message was sent by Atlassian Jira
(v8.20.10#820010)