This is an automated email from the ASF dual-hosted git repository.

danderson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7283d  Fix ClickEventCount for parallelism higher than 1
4a7283d is described below

commit 4a7283d0d5f3a2c125888803a4f0ec1c00374735
Author: Tudor Pavel <tudorv.pa...@gmail.com>
AuthorDate: Fri Oct 20 16:47:07 2023 +0300

    Fix ClickEventCount for parallelism higher than 1
    
    Following the [Operations 
Tutorial](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/flink-operations-playground/#upgrading--rescaling-a-job)
 you're asked to start the job with parallelism 3 but I wasn't getting any 
output when I did that.
    
    The reason I realized is [idle 
partitions](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources)
 on the Kafka input because all input goes into partition 0.
    
    By adding `withIdleness` to the WatermarkStrategy we are ensuring the job 
can work with higher parallelism and idle partitions.
    
    Bonus: this also fixes the recovery in the [normal 
case](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/try-flink/flink-operations-playground/#step-2-introducing-a-fault).
 Without it I was seeing outputs of less than 1k clicks per window when 
resuming the job, whereas now it's the expected 1k every time.
---
 .../org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java     | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 359ef2e..b78ffb7 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -94,6 +94,7 @@ public class ClickEventCount {
 
                WatermarkStrategy<ClickEvent> watermarkStrategy = 
WatermarkStrategy
                                
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(200))
+                               .withIdleness(Duration.ofSeconds(5))
                                .withTimestampAssigner((clickEvent, l) -> 
clickEvent.getTimestamp().getTime());
 
                DataStream<ClickEvent> clicks = env.fromSource(source, 
watermarkStrategy, "ClickEvent Source");

Reply via email to