Ah thanks for the feedback. I can work around for now but will upgrade as soon 
as I can to the latest version.

Thanks very much,

James.
________________________________
From: Piotr Nowojski <pnowoj...@apache.org>
Sent: 08 October 2021 13:17
To: James Sandys-Lumsdaine <jas...@hotmail.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Empty Kafka topics and watermarks

Hi James,

I believe you have encountered a bug that we have already fixed [1]. The small 
problem is that in order to fix this bug, we had to change some 
`@PublicEvolving` interfaces and thus we were not able to backport this fix to 
earlier minor releases. As such, this is only fixed starting from 1.14.x.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18934

pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
<jas...@hotmail.com<mailto:jas...@hotmail.com>> napisał(a):
Hi everyone,

I'm putting together a Flink workflow that needs to merge historic data from a 
custom JDBC source with a Kafka flow (for the realtime data). I have 
successfully written the custom JDBC source that emits a watermark for the last 
event time after all the DB data has been emitted but now I face a problem when 
joining with data from the Kafka stream.

I register a timer in my KeyedCoProcessFunction joining the DB stream with live 
Kafka stream so I can emit all the "batch" data from the DB in one go when 
completely read up to the watermark but the timer never fires as the Kafka 
stream is empty and therefore doesn't emit a watermark. My Kafka stream will 
allowed to be empty since all the data will have been retrieved from the DB 
call so I only expect new events to appear over Kafka. Note that if I replace 
the Kafka input with a simple env.fromCollection(...) empty list then the timer 
triggers fine as Flink seems to detect it doesn't need to wait for any input 
from stream 2. So it seems to be something related to the Kafka stream status 
that is preventing the watermark from advancing in the KeyedCoProcessFunction.

I have tried configuring the Kafka stream timestamp and watermark strategies to 
so the source is marked as idle after 10 seconds but still it seems the 
watermark in the join operator combining these 2 streams is not advancing. (See 
example code below).

Maybe this is my bad understanding but I thought if an input stream into a 
KeyedCoProcessFunction is idle then it wouldn't be considered by the operator 
for forwarding the watermark i.e. it would forward the non-idle input stream's 
watermark and not do a min(stream1WM, stream2WM). With the below example I 
never see the onTimer fire and the only effect the withIdleness() strategy has 
is to stop the print statements in onPeriodicEmit() happening after 5 seconds 
(env periodic emit is set to the default 200ms so I see 25 rows before it 
stops).

The only way I can get my KeyedCoProcessFunction timer to fire is to force an 
emit of the watermark I want in the onPeriodicEmit() after x numbers of 
attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 
times and the "latestWatermark" is still Long.MIN_VALUE then I emit the 
watermark I want so the join can progress. This seems like a nasty hack to me 
but perhaps something like this is actually necessary?

I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers 
would be appreciated.

Thanks in advance,

James.


FlinkKafkaConsumer<Position> positionsFlinkKafkaConsumer = new 
FlinkKafkaConsumer<>("poc.positions", 
ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, 
SchemaRegistryURL), kafkaConsumerProperties);

positionsFlinkKafkaConsumer.setStartFromEarliest();

positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(

       new WatermarkStrategy<Position>() {

              @Override

              public TimestampAssigner<Position> 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {

                        return (event, recordTimestamp) -> {

                            return event.getPhysicalFrom();

                        };

                    }



              @Override

              public WatermarkGenerator<Position> 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

                        return new WatermarkGenerator<Position>() {

                            public long latestWatermark = Long.MIN_VALUE;



                            @Override

                            public void onEvent(Position event, long timestamp, 
WatermarkOutput output) {

                                long eventWatermark = event.getPhysicalFrom();

                                if (eventWatermark > latestWatermark)

                                    latestWatermark = eventWatermark;

                            }



                            @Override

                            public void onPeriodicEmit(WatermarkOutput output) {

                                System.out.printf("Emitting watermark %d\n", 
latestWatermark);

                                output.emitWatermark(new 
Watermark(latestWatermark));

                            }

                        };

                    }

                }.withIdleness(Duration.ofSeconds(5)));



DataStream<Position> positionKafkaInputStream = 
env.addSource(positionsFlinkKafkaConsumer, "Kafka-Source");



DataStream<Position> otherPositionStream = 
env.fromCollection(Lists.newArrayList(new Position(..., 
timestamp.getMillis())), TypeInformation.of(Position.class));

otherPositionStream.assignTimestampsAndWatermarks(

        WatermarkStrategy

                        
.<Position>forBoundedOutOfOrderness(Duration.ofSeconds(10))

                        .withTimestampAssigner((e, t) -> e.getPhysicalFrom()));



KeyedStream<Position, String> keyedPositionKafkaInputStream = 
positionKafkaInputStream.keyBy(p -> p.getMarketName());

KeyedStream<Position, String> keyedOtherPositionStream = 
otherPositionStream.keyBy(p -> p.getMarketName());



DataStream<Position> joinedStream = keyedOtherPositionStream

                .connect(keyedPositionKafkaInputStream)

                .process(new JoinProcessFunction());

...


private static class JoinProcessFunction extends KeyedCoProcessFunction<String, 
Position, Position, Position> {

       private static Logger logger = 
LoggerFactory.getLogger(JoinProcessFunction.class);



       @Override

       public void processElement1(Position position, Context context, 
Collector<Position> collector) {

            logger.info<http://logger.info>("processPosition1: key: {}, ts: {}, 
watermark: {}", context.getCurrentKey(), context.timestamp(), 
context.timerService().currentWatermark());

            
context.timerService().registerEventTimeTimer(position.getPhysicalFrom());

            collector.collect(position);

       }



       @Override

       public void processElement2(Position position, Context context, 
Collector<Position> collector) {

            logger.info<http://logger.info>("processPosition2: key: {}, ts: {}, 
watermark: {}", context.getCurrentKey(), context.timestamp(), 
context.timerService().currentWatermark());

            
context.timerService().registerEventTimeTimer(position.getPhysicalFrom());

            collector.collect(position);

       }



       @Override

       public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Position> out) {

            logger.info<http://logger.info>("Timer triggered for timestamp {} 
and key '{}' - current Watermark: {}.", timestamp, ctx.getCurrentKey(), 
ctx.timerService().currentWatermark());

       }

}

Reply via email to