This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 1370e4b189de5b059caf0193ad033db6195b2767 Author: Igal Shilman <igalshil...@gmail.com> AuthorDate: Wed Oct 21 14:32:03 2020 +0200 [FLINK-19692][core] Expose the list of assigned key groups This commit exposes the list of key groups that can be written into the raw keyed stream. --- .../core/logger/CheckpointedStreamOperations.java | 2 ++ .../flink/statefun/flink/core/logger/Loggers.java | 5 +++++ .../core/logger/UnboundedFeedbackLoggerTest.java | 20 +++++++++++++++++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java index 2540934..82f79c6 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java @@ -25,6 +25,8 @@ public interface CheckpointedStreamOperations { void requireKeyedStateCheckpointed(OutputStream keyedStateCheckpointOutputStream); + Iterable<Integer> keyGroupList(OutputStream stream); + void startNewKeyGroup(OutputStream stream, int keyGroup) throws IOException; Closeable acquireLease(OutputStream keyedStateCheckpointOutputStream); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java index aee7536..d0c8db6 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java @@ -87,6 +87,11 @@ public final class Loggers { } @Override + public Iterable<Integer> keyGroupList(OutputStream stream) { + return cast(stream).getKeyGroupList(); + } + + @Override public void startNewKeyGroup(OutputStream stream, int keyGroup) throws IOException { cast(stream).startNewKeyGroup(keyGroup); } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java index dd7088f..08600ef 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat; import java.io.*; import java.util.ArrayList; import java.util.function.Function; +import java.util.stream.IntStream; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.statefun.flink.core.di.ObjectContainer; @@ -114,12 +115,19 @@ public class UnboundedFeedbackLoggerTest { Loggers.unboundedSpillableLoggerContainer( IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, Function.identity()); - container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, NOOP.INSTANCE); + container.add( + "checkpoint-stream-ops", + CheckpointedStreamOperations.class, + new NoopStreamOps(maxParallelism)); return container.get(UnboundedFeedbackLoggerFactory.class).create(); } - enum NOOP implements CheckpointedStreamOperations { - INSTANCE; + static final class NoopStreamOps implements CheckpointedStreamOperations { + private final int maxParallelism; + + NoopStreamOps(int maxParallelism) { + this.maxParallelism = maxParallelism; + } @Override public void requireKeyedStateCheckpointed(OutputStream keyedStateCheckpointOutputStream) { @@ -127,6 +135,12 @@ public class UnboundedFeedbackLoggerTest { } @Override + public Iterable<Integer> keyGroupList(OutputStream stream) { + IntStream range = IntStream.range(0, maxParallelism); + return range::iterator; + } + + @Override public void startNewKeyGroup(OutputStream stream, int keyGroup) {} @Override