Hi James,

I believe you have encountered a bug that we have already fixed [1]. The
small problem is that in order to fix this bug, we had to change some
`@PublicEvolving` interfaces and thus we were not able to backport this fix
to earlier minor releases. As such, this is only fixed starting from 1.14.x.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18934

pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine <jas...@hotmail.com>
napisał(a):

> Hi everyone,
>
> I'm putting together a Flink workflow that needs to merge historic data
> from a custom JDBC source with a Kafka flow (for the realtime data). I have
> successfully written the custom JDBC source that emits a watermark for the
> last event time after all the DB data has been emitted but now I face a
> problem when joining with data from the Kafka stream.
>
> I register a timer in my KeyedCoProcessFunction joining the DB stream
> with live Kafka stream so I can emit all the "batch" data from the DB in
> one go when completely read up to the watermark but the timer never fires
> as the Kafka stream is empty and therefore doesn't emit a watermark. My
> Kafka stream will allowed to be empty since all the data will have been
> retrieved from the DB call so I only expect new events to appear over
> Kafka. Note that if I replace the Kafka input with a simple
> env.fromCollection(...) empty list then the timer triggers fine as Flink
> seems to detect it doesn't need to wait for any input from stream 2. So it
> seems to be something related to the Kafka stream status that is preventing
> the watermark from advancing in the KeyedCoProcessFunction.
>
> I have tried configuring the Kafka stream timestamp and watermark
> strategies to so the source is marked as idle after 10 seconds but still it
> seems the watermark in the join operator combining these 2 streams is not
> advancing. (See example code below).
>
> Maybe this is my bad understanding but I thought if an input stream into a
> KeyedCoProcessFunction is idle then it wouldn't be considered by the
> operator for forwarding the watermark i.e. it would forward the non-idle
> input stream's watermark and not do a min(stream1WM, stream2WM). With the
> below example I never see the onTimer fire and the only effect the
> withIdleness() strategy has is to stop the print statements in
> onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the
> default 200ms so I see 25 rows before it stops).
>
> The only way I can get my KeyedCoProcessFunction timer to fire is to force
> an emit of the watermark I want in the onPeriodicEmit() after x numbers of
> attempts to advance an initial watermark i.e. if onPeriodicEmit() is called
> 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the
> watermark I want so the join can progress. This seems like a nasty hack to
> me but perhaps something like this is actually necessary?
>
> I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any
> pointers would be appreciated.
>
> Thanks in advance,
>
> James.
>
> FlinkKafkaConsumer<Position> positionsFlinkKafkaConsumer = new
> FlinkKafkaConsumer<>("poc.positions",
> ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class,
> SchemaRegistryURL), kafkaConsumerProperties);
>
> positionsFlinkKafkaConsumer.setStartFromEarliest();
>
> positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(
>
>        new WatermarkStrategy<Position>() {
>
>               @Override
>
>               public TimestampAssigner<Position>
> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>
>                         return (event, recordTimestamp) -> {
>
>                             return event.getPhysicalFrom();
>
>                         };
>
>                     }
>
>
>
>               @Override
>
>               public WatermarkGenerator<Position>
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>
>                         return new WatermarkGenerator<Position>() {
>
>                             public long latestWatermark = Long.MIN_VALUE;
>
>
>
>                             @Override
>
>                             public void onEvent(Position event, long
> timestamp, WatermarkOutput output) {
>
>                                 long eventWatermark =
> event.getPhysicalFrom();
>
>                                 if (eventWatermark > latestWatermark)
>
>                                     latestWatermark = eventWatermark;
>
>                             }
>
>
>
>                             @Override
>
>                             public void onPeriodicEmit(WatermarkOutput
> output) {
>
>                                 System.out.printf("Emitting watermark
> %d\n", latestWatermark);
>
>                                 output.emitWatermark(new
> Watermark(latestWatermark));
>
>                             }
>
>                         };
>
>                     }
>
>                 }.withIdleness(Duration.ofSeconds(5)));
>
>
>
> DataStream<Position> positionKafkaInputStream =
> env.addSource(positionsFlinkKafkaConsumer, "Kafka-Source");
>
>
>
> DataStream<Position> otherPositionStream =
> env.fromCollection(Lists.newArrayList(new Position(...,
> timestamp.getMillis())), TypeInformation.of(Position.class));
>
> otherPositionStream.assignTimestampsAndWatermarks(
>
>         WatermarkStrategy
>
>
> .<Position>forBoundedOutOfOrderness(Duration.ofSeconds(10))
>
>                         .withTimestampAssigner((e, t) ->
> e.getPhysicalFrom()));
>
>
>
> KeyedStream<Position, String> keyedPositionKafkaInputStream =
> positionKafkaInputStream.keyBy(p -> p.getMarketName());
>
> KeyedStream<Position, String> keyedOtherPositionStream =
> otherPositionStream.keyBy(p -> p.getMarketName());
>
>
>
> DataStream<Position> joinedStream = keyedOtherPositionStream
>
>                 .connect(keyedPositionKafkaInputStream)
>
>                 .process(new JoinProcessFunction());
>
> ...
>
>
> private static class JoinProcessFunction extends
> KeyedCoProcessFunction<String, Position, Position, Position> {
>
>        private static Logger logger =
> LoggerFactory.getLogger(JoinProcessFunction.class);
>
>
>
>        @Override
>
>        public void processElement1(Position position, Context context,
> Collector<Position> collector) {
>
>             logger.info("processPosition1: key: {}, ts: {}, watermark:
> {}", context.getCurrentKey(), context.timestamp(),
> context.timerService().currentWatermark());
>
>
> context.timerService().registerEventTimeTimer(position.getPhysicalFrom());
>
>             collector.collect(position);
>
>        }
>
>
>
>        @Override
>
>        public void processElement2(Position position, Context context,
> Collector<Position> collector) {
>
>             logger.info("processPosition2: key: {}, ts: {}, watermark:
> {}", context.getCurrentKey(), context.timestamp(),
> context.timerService().currentWatermark());
>
>
> context.timerService().registerEventTimeTimer(position.getPhysicalFrom());
>
>             collector.collect(position);
>
>        }
>
>
>
>        @Override
>
>        public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<Position> out) {
>
>             logger.info("Timer triggered for timestamp {} and key '{}' -
> current Watermark: {}.", timestamp, ctx.getCurrentKey(),
> ctx.timerService().currentWatermark());
>
>        }
>
> }
>
>

Reply via email to