Re: Empty Kafka topics and watermarks
Great, thanks! pon., 11 paź 2021 o 17:24 James Sandys-Lumsdaine napisał(a): > Ah thanks for the feedback. I can work around for now but will upgrade as > soon as I can to the latest version. > > Thanks very much, > > James. > -- > *From:* Piotr Nowojski > *Sent:* 08 October 2021 13:17 > *To:* James Sandys-Lumsdaine > *Cc:* user@flink.apache.org > *Subject:* Re: Empty Kafka topics and watermarks > > Hi James, > > I believe you have encountered a bug that we have already fixed [1]. The > small problem is that in order to fix this bug, we had to change some > `@PublicEvolving` interfaces and thus we were not able to backport this fix > to earlier minor releases. As such, this is only fixed starting from 1.14.x. > > Best, > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-18934 > > pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine > napisał(a): > > Hi everyone, > > I'm putting together a Flink workflow that needs to merge historic data > from a custom JDBC source with a Kafka flow (for the realtime data). I have > successfully written the custom JDBC source that emits a watermark for the > last event time after all the DB data has been emitted but now I face a > problem when joining with data from the Kafka stream. > > I register a timer in my KeyedCoProcessFunction joining the DB stream > with live Kafka stream so I can emit all the "batch" data from the DB in > one go when completely read up to the watermark but the timer never fires > as the Kafka stream is empty and therefore doesn't emit a watermark. My > Kafka stream will allowed to be empty since all the data will have been > retrieved from the DB call so I only expect new events to appear over > Kafka. Note that if I replace the Kafka input with a simple > env.fromCollection(...) empty list then the timer triggers fine as Flink > seems to detect it doesn't need to wait for any input from stream 2. So it > seems to be something related to the Kafka stream status that is preventing > the watermark from advancing in the KeyedCoProcessFunction. > > I have tried configuring the Kafka stream timestamp and watermark > strategies to so the source is marked as idle after 10 seconds but still it > seems the watermark in the join operator combining these 2 streams is not > advancing. (See example code below). > > Maybe this is my bad understanding but I thought if an input stream into a > KeyedCoProcessFunction is idle then it wouldn't be considered by the > operator for forwarding the watermark i.e. it would forward the non-idle > input stream's watermark and not do a min(stream1WM, stream2WM). With the > below example I never see the onTimer fire and the only effect the > withIdleness() strategy has is to stop the print statements in > onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the > default 200ms so I see 25 rows before it stops). > > The only way I can get my KeyedCoProcessFunction timer to fire is to force > an emit of the watermark I want in the onPeriodicEmit() after x numbers of > attempts to advance an initial watermark i.e. if onPeriodicEmit() is called > 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the > watermark I want so the join can progress. This seems like a nasty hack to > me but perhaps something like this is actually necessary? > > I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any > pointers would be appreciated. > > Thanks in advance, > > James. > > FlinkKafkaConsumer positionsFlinkKafkaConsumer = new > FlinkKafkaConsumer<>("poc.positions", > ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, > SchemaRegistryURL), kafkaConsumerProperties); > > positionsFlinkKafkaConsumer.setStartFromEarliest(); > > positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks( > >new WatermarkStrategy() { > > @Override > > public TimestampAssigner > createTimestampAssigner(TimestampAssignerSupplier.Context context) { > > return (event, recordTimestamp) -> { > > return event.getPhysicalFrom(); > > }; > > } > > > > @Override > > public WatermarkGenerator > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { > > return new WatermarkGenerator() { > > public long latestWatermark = Long.MIN_VALUE; > > > > @Override > > public void onEvent(Posi
Re: Empty Kafka topics and watermarks
Ah thanks for the feedback. I can work around for now but will upgrade as soon as I can to the latest version. Thanks very much, James. From: Piotr Nowojski Sent: 08 October 2021 13:17 To: James Sandys-Lumsdaine Cc: user@flink.apache.org Subject: Re: Empty Kafka topics and watermarks Hi James, I believe you have encountered a bug that we have already fixed [1]. The small problem is that in order to fix this bug, we had to change some `@PublicEvolving` interfaces and thus we were not able to backport this fix to earlier minor releases. As such, this is only fixed starting from 1.14.x. Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-18934 pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine mailto:jas...@hotmail.com>> napisał(a): Hi everyone, I'm putting together a Flink workflow that needs to merge historic data from a custom JDBC source with a Kafka flow (for the realtime data). I have successfully written the custom JDBC source that emits a watermark for the last event time after all the DB data has been emitted but now I face a problem when joining with data from the Kafka stream. I register a timer in my KeyedCoProcessFunction joining the DB stream with live Kafka stream so I can emit all the "batch" data from the DB in one go when completely read up to the watermark but the timer never fires as the Kafka stream is empty and therefore doesn't emit a watermark. My Kafka stream will allowed to be empty since all the data will have been retrieved from the DB call so I only expect new events to appear over Kafka. Note that if I replace the Kafka input with a simple env.fromCollection(...) empty list then the timer triggers fine as Flink seems to detect it doesn't need to wait for any input from stream 2. So it seems to be something related to the Kafka stream status that is preventing the watermark from advancing in the KeyedCoProcessFunction. I have tried configuring the Kafka stream timestamp and watermark strategies to so the source is marked as idle after 10 seconds but still it seems the watermark in the join operator combining these 2 streams is not advancing. (See example code below). Maybe this is my bad understanding but I thought if an input stream into a KeyedCoProcessFunction is idle then it wouldn't be considered by the operator for forwarding the watermark i.e. it would forward the non-idle input stream's watermark and not do a min(stream1WM, stream2WM). With the below example I never see the onTimer fire and the only effect the withIdleness() strategy has is to stop the print statements in onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the default 200ms so I see 25 rows before it stops). The only way I can get my KeyedCoProcessFunction timer to fire is to force an emit of the watermark I want in the onPeriodicEmit() after x numbers of attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the watermark I want so the join can progress. This seems like a nasty hack to me but perhaps something like this is actually necessary? I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers would be appreciated. Thanks in advance, James. FlinkKafkaConsumer positionsFlinkKafkaConsumer = new FlinkKafkaConsumer<>("poc.positions", ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, SchemaRegistryURL), kafkaConsumerProperties); positionsFlinkKafkaConsumer.setStartFromEarliest(); positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (event, recordTimestamp) -> { return event.getPhysicalFrom(); }; } @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator() { public long latestWatermark = Long.MIN_VALUE; @Override public void onEvent(Position event, long timestamp, WatermarkOutput output) { long eventWatermark = event.getPhysicalFrom(); if (eventWatermark > latestWatermark) latestWatermark = eventWatermark; } @Override public void onPeriodicEmit(WatermarkOutput output) { System.out.printf("Emitting watermark %d\n", latestWatermark);
Re: Empty Kafka topics and watermarks
Hi James, I believe you have encountered a bug that we have already fixed [1]. The small problem is that in order to fix this bug, we had to change some `@PublicEvolving` interfaces and thus we were not able to backport this fix to earlier minor releases. As such, this is only fixed starting from 1.14.x. Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-18934 pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine napisał(a): > Hi everyone, > > I'm putting together a Flink workflow that needs to merge historic data > from a custom JDBC source with a Kafka flow (for the realtime data). I have > successfully written the custom JDBC source that emits a watermark for the > last event time after all the DB data has been emitted but now I face a > problem when joining with data from the Kafka stream. > > I register a timer in my KeyedCoProcessFunction joining the DB stream > with live Kafka stream so I can emit all the "batch" data from the DB in > one go when completely read up to the watermark but the timer never fires > as the Kafka stream is empty and therefore doesn't emit a watermark. My > Kafka stream will allowed to be empty since all the data will have been > retrieved from the DB call so I only expect new events to appear over > Kafka. Note that if I replace the Kafka input with a simple > env.fromCollection(...) empty list then the timer triggers fine as Flink > seems to detect it doesn't need to wait for any input from stream 2. So it > seems to be something related to the Kafka stream status that is preventing > the watermark from advancing in the KeyedCoProcessFunction. > > I have tried configuring the Kafka stream timestamp and watermark > strategies to so the source is marked as idle after 10 seconds but still it > seems the watermark in the join operator combining these 2 streams is not > advancing. (See example code below). > > Maybe this is my bad understanding but I thought if an input stream into a > KeyedCoProcessFunction is idle then it wouldn't be considered by the > operator for forwarding the watermark i.e. it would forward the non-idle > input stream's watermark and not do a min(stream1WM, stream2WM). With the > below example I never see the onTimer fire and the only effect the > withIdleness() strategy has is to stop the print statements in > onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the > default 200ms so I see 25 rows before it stops). > > The only way I can get my KeyedCoProcessFunction timer to fire is to force > an emit of the watermark I want in the onPeriodicEmit() after x numbers of > attempts to advance an initial watermark i.e. if onPeriodicEmit() is called > 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the > watermark I want so the join can progress. This seems like a nasty hack to > me but perhaps something like this is actually necessary? > > I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any > pointers would be appreciated. > > Thanks in advance, > > James. > > FlinkKafkaConsumer positionsFlinkKafkaConsumer = new > FlinkKafkaConsumer<>("poc.positions", > ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, > SchemaRegistryURL), kafkaConsumerProperties); > > positionsFlinkKafkaConsumer.setStartFromEarliest(); > > positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks( > >new WatermarkStrategy() { > > @Override > > public TimestampAssigner > createTimestampAssigner(TimestampAssignerSupplier.Context context) { > > return (event, recordTimestamp) -> { > > return event.getPhysicalFrom(); > > }; > > } > > > > @Override > > public WatermarkGenerator > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { > > return new WatermarkGenerator() { > > public long latestWatermark = Long.MIN_VALUE; > > > > @Override > > public void onEvent(Position event, long > timestamp, WatermarkOutput output) { > > long eventWatermark = > event.getPhysicalFrom(); > > if (eventWatermark > latestWatermark) > > latestWatermark = eventWatermark; > > } > > > > @Override > > public void onPeriodicEmit(WatermarkOutput > output) { > > System.out.printf("Emitting watermark > %d\n", latestWatermark); > > output.emitWatermark(new > Watermark(latestWatermark)); > > } > > }; > > } > > }.withIdleness(Duration.ofSeconds(5))); > > > > DataStream positionKafkaInputStream = > env.addSource(positionsFli
Empty Kafka topics and watermarks
Hi everyone, I'm putting together a Flink workflow that needs to merge historic data from a custom JDBC source with a Kafka flow (for the realtime data). I have successfully written the custom JDBC source that emits a watermark for the last event time after all the DB data has been emitted but now I face a problem when joining with data from the Kafka stream. I register a timer in my KeyedCoProcessFunction joining the DB stream with live Kafka stream so I can emit all the "batch" data from the DB in one go when completely read up to the watermark but the timer never fires as the Kafka stream is empty and therefore doesn't emit a watermark. My Kafka stream will allowed to be empty since all the data will have been retrieved from the DB call so I only expect new events to appear over Kafka. Note that if I replace the Kafka input with a simple env.fromCollection(...) empty list then the timer triggers fine as Flink seems to detect it doesn't need to wait for any input from stream 2. So it seems to be something related to the Kafka stream status that is preventing the watermark from advancing in the KeyedCoProcessFunction. I have tried configuring the Kafka stream timestamp and watermark strategies to so the source is marked as idle after 10 seconds but still it seems the watermark in the join operator combining these 2 streams is not advancing. (See example code below). Maybe this is my bad understanding but I thought if an input stream into a KeyedCoProcessFunction is idle then it wouldn't be considered by the operator for forwarding the watermark i.e. it would forward the non-idle input stream's watermark and not do a min(stream1WM, stream2WM). With the below example I never see the onTimer fire and the only effect the withIdleness() strategy has is to stop the print statements in onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the default 200ms so I see 25 rows before it stops). The only way I can get my KeyedCoProcessFunction timer to fire is to force an emit of the watermark I want in the onPeriodicEmit() after x numbers of attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the watermark I want so the join can progress. This seems like a nasty hack to me but perhaps something like this is actually necessary? I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers would be appreciated. Thanks in advance, James. FlinkKafkaConsumer positionsFlinkKafkaConsumer = new FlinkKafkaConsumer<>("poc.positions", ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, SchemaRegistryURL), kafkaConsumerProperties); positionsFlinkKafkaConsumer.setStartFromEarliest(); positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (event, recordTimestamp) -> { return event.getPhysicalFrom(); }; } @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator() { public long latestWatermark = Long.MIN_VALUE; @Override public void onEvent(Position event, long timestamp, WatermarkOutput output) { long eventWatermark = event.getPhysicalFrom(); if (eventWatermark > latestWatermark) latestWatermark = eventWatermark; } @Override public void onPeriodicEmit(WatermarkOutput output) { System.out.printf("Emitting watermark %d\n", latestWatermark); output.emitWatermark(new Watermark(latestWatermark)); } }; } }.withIdleness(Duration.ofSeconds(5))); DataStream positionKafkaInputStream = env.addSource(positionsFlinkKafkaConsumer, "Kafka-Source"); DataStream otherPositionStream = env.fromCollection(Lists.newArrayList(new Position(..., timestamp.getMillis())), TypeInformation.of(Position.class)); otherPositionStream.assignTimestampsAndWatermarks( WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((e, t) -> e.getPhysicalFrom())); KeyedStream keyedPositionKafkaInputStream = positionKafkaInputStream.keyBy(p -> p.getMarketName()); KeyedStream keyedOtherPositionStream = otherPositionStream.keyBy(p -> p