Can issues with Kafka consumption influence the watermark?

2023-08-07 Thread Piotr Domagalski
Hi!

We have a rather simple Flink job which has the main purpose is
concatenating events read from Kafka and outputting them in session windows
with a gap of 1 minute, 1 minute out of order and 1h idleness setting:
[image: Screenshot 2023-08-07 at 13.44.09.png]
The problem we are facing is that we sometimes see it starts to struggle
and it seems it all starts with the following message appearing in the
source operator:

[Consumer clientId=sessionizer, groupId=sessionizer] Error sending fetch
request (sessionId=1914084170, epoch=8) to node 2: [Consumer
clientId=sessionizer, groupId=sessionizer] Cancelled in-flight FETCH
request with correlation id 105760966 due to node 2 being disconnected
(elapsed time since creation: 31311ms, elapsed time since send: 31311ms,
request timeout: 3ms) [Consumer clientId=sessionizer,
groupId=sessionizer] Disconnecting from node 2 due to request timeout.

What happens right after this is mind-puzzling. In theory, we understand
that issues consuming from some partitions (as the error above seems to
suggest), should not influence how watermarks are handled - ie. the
watermark will not move forward, thus this should not cause any sessions to
be closed prematurely.

However, what we see is *the opposite* - shortly after (a matter of 1/2
minutes) the error we see a spike in window sessions being closed, just as
if Flink moved forward with the time (as tracked by watermark) but because
of some events missing (not consumed), it decided to close a lot of the
windows as the sessions were not getting any events. But how is this
possible, if at all? Can anyone think of any (even remotely) possible
explanation?

When this happens, we then see an expected turn of following events - a lot
of data gets produced to the sink, we run into Kafka producer quota limits
we have defined, this then puts the backpressure and we see a lag. The
Kafka errors disappear after 15-20 minutes and the situation goes back to
normal. However, some corrupted data gets produced whenever this happens.

This is Flink 1.15.2 running in AWS as Kinesis Data Analytics. There are
144 partitions on the input, parallelism 72, we use Kafka msg event
timestamps (as set by the producer). We've seen it before with 72
partitions and parallelism of 72.

-- 
Piotr Domagalski


Re-interepreting Kafka source as a keyed stream?

2023-06-18 Thread Piotr Domagalski
Hi,

I was wondering if it would be safe for me to make use of
reinterpretAsKeyedStream on a Kafka source in order to have an
"embarrassingly parallel" job without any .keyBy().

My Kafka topic is partitioned by the same id I'm then sending through a
session window operator. Therefore there's in theory no need for data to be
transferred between subtasks (between Kafka source and the windowing
operator). Is it possible to avoid this by using reinterpretAsKeyedStream on
the source?

I'm worried about the warning from the docs saying:

WARNING: The re-interpreted data stream MUST already be pre-partitioned in
EXACTLY the same way Flink’s keyBy would partition the data in a shuffle
w.r.t. key-group assignment.

Of course, the partitioning in Kafka will not be *exactly* the same... What
problems might this cause? I did it on a very small subset of data and
didn't notice any issues.

-- 
Piotr Domagalski


Re: Watermark idleness and alignment - are they exclusive?

2023-06-16 Thread Piotr Domagalski
That looks exactly like what we hit, thank you!

On Thu, Jun 15, 2023 at 10:57 PM Ken Krugler 
wrote:

> I think you’re hitting this issue:
>
> https://issues.apache.org/jira/browse/FLINK-31632
>
> Fixed in 1.16.2, 1.171.
>
> — Ken
>
>
> On Jun 15, 2023, at 1:39 PM, Piotr Domagalski 
> wrote:
>
> Hi all!
>
> We've been experimenting with watermark alignment in Flink 1.15 and
> observed an odd behaviour that I couldn't find any mention of in the
> documentation.
>
> With the following strategy:
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> .withTimestampAssigner((e, t) -> e.timestamp)
> .withIdleness(Duration.ofSeconds(3600))
> .withWatermarkAlignment("group-1", Duration.ofSeconds(15));
>
> Kafka sources stop consuming completely after 3600s (even when the data is
> flowing into all the partitions). Is this an expected behaviour? Where
> could I find more information on this?
>
> --
> Piotr Domagalski
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Piotr Domagalski


Watermark idleness and alignment - are they exclusive?

2023-06-15 Thread Piotr Domagalski
Hi all!

We've been experimenting with watermark alignment in Flink 1.15 and
observed an odd behaviour that I couldn't find any mention of in the
documentation.

With the following strategy:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((e, t) -> e.timestamp)
.withIdleness(Duration.ofSeconds(3600))
.withWatermarkAlignment("group-1", Duration.ofSeconds(15));

Kafka sources stop consuming completely after 3600s (even when the data is
flowing into all the partitions). Is this an expected behaviour? Where
could I find more information on this?

-- 
Piotr Domagalski


Re: Source vs SourceFunction and testing

2022-05-25 Thread Piotr Domagalski
Thank you Qingsheng, this context helps a lot!

And once again thank you all for being such a helpful community!

P.S. I actually struggled for a bit trying to understand why my refactored
solution which accepts DataStream<> wouldn't work ("no operators defined in
the streaming topology"). Turns out, my assumption that I can
call StreamExecutionEnvironment.getExecutionEnvironment() multiple times
and get the same environment, was wrong. I had env.addSource and
env.fromSource calls using one instance of the environment, but then called
env.execute() on another instance :facepalm:

On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren  wrote:

> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction
> are both interfaces to data sources. SourceFunction was designed and
> introduced earlier and as the project evolved, many shortcomings emerged.
> Therefore, the community re-designed the source interface and introduced
> the new Source API in FLIP-27 [1].
>
> Finally we will deprecate the SourceFunction and use Source as the only
> interface for all data sources, but considering the huge cost of migration
> you’ll see SourceFunction and Source co-exist for some time, like the
> ParallelTestSource you mentioned is still on SourceFunction, and
> KafkaSource as a pioneer has already migrated to the new Source API.
>
> I think the API to end users didn't change a lot: both
> env.addSource(SourceFunction) and env.fromSource(Source) return a
> DataStream, and you could apply downstream transformations onto it.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> Cheers,
>
> Qingsheng
>
> > On May 25, 2022, at 03:19, Piotr Domagalski 
> wrote:
> >
> > Hi Ken,
> >
> > Thanks Ken. I guess the problem I had was, as a complete newbie to
> Flink, navigating the type system and being still confused about
> differences between Source, SourceFunction, DataStream, DataStreamOperator,
> etc.
> >
> > I think the DataStream<> type is what I'm looking for? That is, then I
> can use:
> >
> > DataStream source = env.fromSource(getKafkaSource(params),
> watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> >
> > and
> > DataStream s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> >
> > Does that sound right?
> >
> > [1]
> https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> >
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
> wrote:
> > Hi Piotr,
> >
> > The way I handle this is via a workflow class that uses a builder
> approach to specifying inputs, outputs, and any other configuration
> settings.
> >
> > The inputs are typically DataStream.
> >
> > This way I can separate out the Kafka inputs, and use testing sources
> that give me very precise control over the inputs (e.g. I can hold up on
> right side data to ensure my stateful left join junction is handling
> deferred joins properly). I can also use Kafka unit test support (either
> kafka-junit or Spring embedded Kafka) if needed.
> >
> > Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
> >
> > — Ken
> >
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
> >>
> >> Hi,
> >>
> >> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
> >>
> >> I've looked at the flink-training repository examples [1] and they tend
> to expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> >>
> >> Is there any recommended way of handling these discrepancies, ie.
> having to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> >>
> >> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
> >>
> >> --
> >> Piotr Domagalski
> >
> > --
> > Ken Krugler
> > http://www.scaleunlimited.com
> > Custom big data solutions
> > Flink, Pinot, Solr, Elasticsearch
> >
> >
> >
> >
> >
> > --
> > Piotr Domagalski
>
>

-- 
Piotr Domagalski


Re: Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi Ken,

Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.

I think the DataStream<> type is what I'm looking for? That is, then I can
use:

DataStream source = env.fromSource(getKafkaSource(params),
watermarkStrategy, "Kafka");
when using KafkaSource in the normal setup

and
DataStream s = env.addSource(new ParallelTestSource<>(...));
when using the testing source [1]

Does that sound right?

[1]
https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26

On Tue, May 24, 2022 at 7:57 PM Ken Krugler 
wrote:

> Hi Piotr,
>
> The way I handle this is via a workflow class that uses a builder approach
> to specifying inputs, outputs, and any other configuration settings.
>
> The inputs are typically DataStream.
>
> This way I can separate out the Kafka inputs, and use testing sources that
> give me very precise control over the inputs (e.g. I can hold up on right
> side data to ensure my stateful left join junction is handling deferred
> joins properly). I can also use Kafka unit test support (either kafka-junit
> or Spring embedded Kafka) if needed.
>
> Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
>
> — Ken
>
> On May 24, 2022, at 8:34 AM, Piotr Domagalski 
> wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having
> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski
>
>
> ------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Piotr Domagalski


Source vs SourceFunction and testing

2022-05-24 Thread Piotr Domagalski
Hi,

I'm wondering: what ithe recommended way to structure the job which one
would like to test later on with `MiniCluster`.

I've looked at the flink-training repository examples [1] and they tend to
expose the main job as a class that accepts a `SourceFunction` and a
`SinkFunction`, which make sense. But then, my job is normally constructed
with `KafkaSource` which is then passed to `env.fromSource(...`.

Is there any recommended way of handling these discrepancies, ie. having to
use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?

[1]
https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61

-- 
Piotr Domagalski


At-least once sinks and their behaviour in a non-failure scenario

2022-05-12 Thread Piotr Domagalski
Hi,

I'm planning to build a pipeline that is using Kafka source, some stateful
transformation and a RabbitMQ sink. What I don't yet fully understand is
how common should I expect the "at-least once" scenario (ie. seeing
duplicates) on the sink side. The case when things start failing is clear
to me, but what happens when I want to gracefully stop the Flink job?

Am I right in thinking that when I gracefully stop a job with a final
savepoint [1] then what happens is that Kafka source stops consuming, a
checkpoint barrier is sent through the pipeline and this will flush the
sink completely? So my understanding is that if nothing fails and that
Kafka offset is committed, when the job is started again from that
savepoint, it will not result in any duplicates being sent to RabbitMQ. Is
that correct?

Thanks!

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint

-- 
Piotr Domagalski