Hi Salva,

Thanks for the pointers. They were helpful in gaining a better
understanding what happened.

In both situations, these outages occurred at a time of the lowest
traffic in a day. Due to business-logic reasons, we are using a
partition key which may not result in even distribution across all
partitions. It seems conceivable to me that during times of low
traffic some partitions may not see any events for some time.

Now, with no watermarking strategy, I believe we are running into the
problem described in
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
where the Watermark cannot move forward. The docs recommend using a
Watermark strategy i.e. withIdleness which detect inputs as being idle
which will not hold the watermark back.

However, https://issues.apache.org/jira/browse/FLINK-28975 suggests
there is an issue with `withIdleness`. Am I right in understanding
that we cannot safely use `withIdleness` now in the Watermarking
strategy. Does this affect me? I am not doing any joins on the
streams, I just need the message to be sent to the sink, and no lag
building up.

Plan of action:

A. Use Watermarking strategy `withIdleness` on 1.15.2 (which is
affected by FLINK-28975).

OR

B. Upgrade to 1.15.3 (FLINK-28975 is fixed here) with a Watermarking
strategy `withIdleness`.

On Mon, Nov 7, 2022 at 1:05 PM Salva Alcántara <salcantara...@gmail.com> wrote:
>
> Hi Samuel,
>
> Maybe related to this https://issues.apache.org/jira/browse/FLINK-28975? See 
> also:
> - 
> https://stackoverflow.com/questions/72654182/flink-interval-join-datastream-with-kafkasource-drops-all-records
>
> I left a similar comment in your SO post.
>
> Regards,
>
> Salva
>
> On Mon, Nov 7, 2022 at 7:27 AM Samuel Chase <samebch...@gmail.com> wrote:
>>
>> Hello,
>>
>> At work we are using Flink to store timers and notify us when they are
>> triggered. It's been working great over several versions over the
>> years. Flink 1.5 -> Flink 1.9 -> Flink 1.15.2.
>>
>> A few months ago we upgraded from Flink 1.9 to Flink 1.15.2. In the
>> process we had to upgrade all the Flink API code in our job to use the
>> new APIs.
>>
>> Our job code has a Kafka Source and a Kafka Sink. For our Source, we
>> are currently using `WatermarkStrategy.noWatermarks()`. It has been
>> running fine ever since we upgraded, but in the last few weeks we have
>> faced two outages.
>>
>> Configuration:
>>
>> 2 JobManager nodes
>> 5 TaskManager nodes (4 slots each)
>> Parallelism: 16
>> Source topic: 30 partitions
>> Using `setStartingOffsets(OffsetsInitializer.latest())` while
>> initializing the source.
>>
>> Outage #1
>>
>> Our monitoring system alerted us that lag is building up on one
>> partition (out of 30). We did not know of anything we could to do
>> jumpstart consumption on that partition other than by forcing a
>> reassignment. When the TaskManager service on the node to which the
>> partition was assigned was restarted, the lag reduced soon after.
>>
>> Outage #2
>>
>> Something similar happened again, but this time, lag was building up
>> on 9 (out of 30) partitions. Once again, we restarted the TaskManager
>> services on all the nodes, and it started consuming once again.
>>
>> We asked a question on SO,
>> https://stackoverflow.com/q/74272277/2165719 and was directed to ask
>> on the mailing list as well.
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>>
>> In another post, https://stackoverflow.com/a/70101290/2165719 there is
>> a suggestion to use `WatermarkStrategy.withIdleness(...)`. Could this
>> help us?
>>
>> Any help/guidance here would be much appreciated.
>>
>> Thanks,

Reply via email to