Can issues with Kafka consumption influence the watermark?
Hi! We have a rather simple Flink job which has the main purpose is concatenating events read from Kafka and outputting them in session windows with a gap of 1 minute, 1 minute out of order and 1h idleness setting: [image: Screenshot 2023-08-07 at 13.44.09.png] The problem we are facing is that we sometimes see it starts to struggle and it seems it all starts with the following message appearing in the source operator: [Consumer clientId=sessionizer, groupId=sessionizer] Error sending fetch request (sessionId=1914084170, epoch=8) to node 2: [Consumer clientId=sessionizer, groupId=sessionizer] Cancelled in-flight FETCH request with correlation id 105760966 due to node 2 being disconnected (elapsed time since creation: 31311ms, elapsed time since send: 31311ms, request timeout: 3ms) [Consumer clientId=sessionizer, groupId=sessionizer] Disconnecting from node 2 due to request timeout. What happens right after this is mind-puzzling. In theory, we understand that issues consuming from some partitions (as the error above seems to suggest), should not influence how watermarks are handled - ie. the watermark will not move forward, thus this should not cause any sessions to be closed prematurely. However, what we see is *the opposite* - shortly after (a matter of 1/2 minutes) the error we see a spike in window sessions being closed, just as if Flink moved forward with the time (as tracked by watermark) but because of some events missing (not consumed), it decided to close a lot of the windows as the sessions were not getting any events. But how is this possible, if at all? Can anyone think of any (even remotely) possible explanation? When this happens, we then see an expected turn of following events - a lot of data gets produced to the sink, we run into Kafka producer quota limits we have defined, this then puts the backpressure and we see a lag. The Kafka errors disappear after 15-20 minutes and the situation goes back to normal. However, some corrupted data gets produced whenever this happens. This is Flink 1.15.2 running in AWS as Kinesis Data Analytics. There are 144 partitions on the input, parallelism 72, we use Kafka msg event timestamps (as set by the producer). We've seen it before with 72 partitions and parallelism of 72. -- Piotr Domagalski
Re-interepreting Kafka source as a keyed stream?
Hi, I was wondering if it would be safe for me to make use of reinterpretAsKeyedStream on a Kafka source in order to have an "embarrassingly parallel" job without any .keyBy(). My Kafka topic is partitioned by the same id I'm then sending through a session window operator. Therefore there's in theory no need for data to be transferred between subtasks (between Kafka source and the windowing operator). Is it possible to avoid this by using reinterpretAsKeyedStream on the source? I'm worried about the warning from the docs saying: WARNING: The re-interpreted data stream MUST already be pre-partitioned in EXACTLY the same way Flink’s keyBy would partition the data in a shuffle w.r.t. key-group assignment. Of course, the partitioning in Kafka will not be *exactly* the same... What problems might this cause? I did it on a very small subset of data and didn't notice any issues. -- Piotr Domagalski
Re: Watermark idleness and alignment - are they exclusive?
That looks exactly like what we hit, thank you! On Thu, Jun 15, 2023 at 10:57 PM Ken Krugler wrote: > I think you’re hitting this issue: > > https://issues.apache.org/jira/browse/FLINK-31632 > > Fixed in 1.16.2, 1.171. > > — Ken > > > On Jun 15, 2023, at 1:39 PM, Piotr Domagalski > wrote: > > Hi all! > > We've been experimenting with watermark alignment in Flink 1.15 and > observed an odd behaviour that I couldn't find any mention of in the > documentation. > > With the following strategy: > > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)) > .withTimestampAssigner((e, t) -> e.timestamp) > .withIdleness(Duration.ofSeconds(3600)) > .withWatermarkAlignment("group-1", Duration.ofSeconds(15)); > > Kafka sources stop consuming completely after 3600s (even when the data is > flowing into all the partitions). Is this an expected behaviour? Where > could I find more information on this? > > -- > Piotr Domagalski > > > -- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > > -- Piotr Domagalski
Watermark idleness and alignment - are they exclusive?
Hi all! We've been experimenting with watermark alignment in Flink 1.15 and observed an odd behaviour that I couldn't find any mention of in the documentation. With the following strategy: WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner((e, t) -> e.timestamp) .withIdleness(Duration.ofSeconds(3600)) .withWatermarkAlignment("group-1", Duration.ofSeconds(15)); Kafka sources stop consuming completely after 3600s (even when the data is flowing into all the partitions). Is this an expected behaviour? Where could I find more information on this? -- Piotr Domagalski
Re: Source vs SourceFunction and testing
Thank you Qingsheng, this context helps a lot! And once again thank you all for being such a helpful community! P.S. I actually struggled for a bit trying to understand why my refactored solution which accepts DataStream<> wouldn't work ("no operators defined in the streaming topology"). Turns out, my assumption that I can call StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get the same environment, was wrong. I had env.addSource and env.fromSource calls using one instance of the environment, but then called env.execute() on another instance :facepalm: On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren wrote: > Hi Piotr, > > I’d like to share my understanding about this. Source and SourceFunction > are both interfaces to data sources. SourceFunction was designed and > introduced earlier and as the project evolved, many shortcomings emerged. > Therefore, the community re-designed the source interface and introduced > the new Source API in FLIP-27 [1]. > > Finally we will deprecate the SourceFunction and use Source as the only > interface for all data sources, but considering the huge cost of migration > you’ll see SourceFunction and Source co-exist for some time, like the > ParallelTestSource you mentioned is still on SourceFunction, and > KafkaSource as a pioneer has already migrated to the new Source API. > > I think the API to end users didn't change a lot: both > env.addSource(SourceFunction) and env.fromSource(Source) return a > DataStream, and you could apply downstream transformations onto it. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > Cheers, > > Qingsheng > > > On May 25, 2022, at 03:19, Piotr Domagalski > wrote: > > > > Hi Ken, > > > > Thanks Ken. I guess the problem I had was, as a complete newbie to > Flink, navigating the type system and being still confused about > differences between Source, SourceFunction, DataStream, DataStreamOperator, > etc. > > > > I think the DataStream<> type is what I'm looking for? That is, then I > can use: > > > > DataStream source = env.fromSource(getKafkaSource(params), > watermarkStrategy, "Kafka"); > > when using KafkaSource in the normal setup > > > > and > > DataStream s = env.addSource(new ParallelTestSource<>(...)); > > when using the testing source [1] > > > > Does that sound right? > > > > [1] > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26 > > > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler > wrote: > > Hi Piotr, > > > > The way I handle this is via a workflow class that uses a builder > approach to specifying inputs, outputs, and any other configuration > settings. > > > > The inputs are typically DataStream. > > > > This way I can separate out the Kafka inputs, and use testing sources > that give me very precise control over the inputs (e.g. I can hold up on > right side data to ensure my stateful left join junction is handling > deferred joins properly). I can also use Kafka unit test support (either > kafka-junit or Spring embedded Kafka) if needed. > > > > Then in the actual tool class (with a main method) I’ll wire up the real > Kafka sources, with whatever logic is required to convert the consumer > records to what the workflow is expecting. > > > > — Ken > > > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski > wrote: > >> > >> Hi, > >> > >> I'm wondering: what ithe recommended way to structure the job which one > would like to test later on with `MiniCluster`. > >> > >> I've looked at the flink-training repository examples [1] and they tend > to expose the main job as a class that accepts a `SourceFunction` and a > `SinkFunction`, which make sense. But then, my job is normally constructed > with `KafkaSource` which is then passed to `env.fromSource(...`. > >> > >> Is there any recommended way of handling these discrepancies, ie. > having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > >> > >> [1] > https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 > >> > >> -- > >> Piotr Domagalski > > > > -- > > Ken Krugler > > http://www.scaleunlimited.com > > Custom big data solutions > > Flink, Pinot, Solr, Elasticsearch > > > > > > > > > > > > -- > > Piotr Domagalski > > -- Piotr Domagalski
Re: Source vs SourceFunction and testing
Hi Ken, Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, navigating the type system and being still confused about differences between Source, SourceFunction, DataStream, DataStreamOperator, etc. I think the DataStream<> type is what I'm looking for? That is, then I can use: DataStream source = env.fromSource(getKafkaSource(params), watermarkStrategy, "Kafka"); when using KafkaSource in the normal setup and DataStream s = env.addSource(new ParallelTestSource<>(...)); when using the testing source [1] Does that sound right? [1] https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26 On Tue, May 24, 2022 at 7:57 PM Ken Krugler wrote: > Hi Piotr, > > The way I handle this is via a workflow class that uses a builder approach > to specifying inputs, outputs, and any other configuration settings. > > The inputs are typically DataStream. > > This way I can separate out the Kafka inputs, and use testing sources that > give me very precise control over the inputs (e.g. I can hold up on right > side data to ensure my stateful left join junction is handling deferred > joins properly). I can also use Kafka unit test support (either kafka-junit > or Spring embedded Kafka) if needed. > > Then in the actual tool class (with a main method) I’ll wire up the real > Kafka sources, with whatever logic is required to convert the consumer > records to what the workflow is expecting. > > — Ken > > On May 24, 2022, at 8:34 AM, Piotr Domagalski > wrote: > > Hi, > > I'm wondering: what ithe recommended way to structure the job which one > would like to test later on with `MiniCluster`. > > I've looked at the flink-training repository examples [1] and they tend to > expose the main job as a class that accepts a `SourceFunction` and a > `SinkFunction`, which make sense. But then, my job is normally constructed > with `KafkaSource` which is then passed to `env.fromSource(...`. > > Is there any recommended way of handling these discrepancies, ie. having > to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > > [1] > https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 > > -- > Piotr Domagalski > > > ------ > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > > -- Piotr Domagalski
Source vs SourceFunction and testing
Hi, I'm wondering: what ithe recommended way to structure the job which one would like to test later on with `MiniCluster`. I've looked at the flink-training repository examples [1] and they tend to expose the main job as a class that accepts a `SourceFunction` and a `SinkFunction`, which make sense. But then, my job is normally constructed with `KafkaSource` which is then passed to `env.fromSource(...`. Is there any recommended way of handling these discrepancies, ie. having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? [1] https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 -- Piotr Domagalski
At-least once sinks and their behaviour in a non-failure scenario
Hi, I'm planning to build a pipeline that is using Kafka source, some stateful transformation and a RabbitMQ sink. What I don't yet fully understand is how common should I expect the "at-least once" scenario (ie. seeing duplicates) on the sink side. The case when things start failing is clear to me, but what happens when I want to gracefully stop the Flink job? Am I right in thinking that when I gracefully stop a job with a final savepoint [1] then what happens is that Kafka source stops consuming, a checkpoint barrier is sent through the pipeline and this will flush the sink completely? So my understanding is that if nothing fails and that Kafka offset is committed, when the job is started again from that savepoint, it will not result in any duplicates being sent to RabbitMQ. Is that correct? Thanks! [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint -- Piotr Domagalski