This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ada9199f19220ebc4b72eb2227ffa0a8ac8842a
Author: sxnan <suxuanna...@gmail.com>
AuthorDate: Tue Aug 9 18:08:19 2022 +0800

    [FLINK-28860][datastream] Cache consumption in stream mode recompute result 
in case of cache miss
---
 .../translators/CacheTransformationTranslator.java |   6 +-
 .../api/graph/StreamGraphGeneratorTest.java        |  17 ---
 .../flink/test/streaming/runtime/CacheITCase.java  | 141 ++++++++-------------
 3 files changed, 54 insertions(+), 110 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java
index c55998ab7fd..cfd958d5ec9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java
@@ -72,8 +72,10 @@ public class CacheTransformationTranslator<OUT, T extends 
CacheTransformation<OU
         if (transformation.isCached()) {
             return consumeCache(transformation, context);
         } else {
-            throw new RuntimeException(
-                    "Producing cache IntermediateResult is not supported in 
streaming mode");
+            final List<Transformation<?>> inputs = transformation.getInputs();
+            Preconditions.checkState(
+                    inputs.size() == 1, "There could be only one 
transformation input to cache");
+            return context.getStreamNodeIds(inputs.get(0));
         }
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3e52c036f7a..13495a0d933 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -907,23 +907,6 @@ public class StreamGraphGeneratorTest extends TestLogger {
                         });
     }
 
-    @Test
-    public void testCacheInStreamModeThrowsException() {
-        final TestingStreamExecutionEnvironment env = new 
TestingStreamExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
-        DataStream<Integer> source = env.fromElements(1, 2, 3);
-        final int upstreamParallelism = 3;
-        CachedDataStream<Integer> cachedStream =
-                source.keyBy(i -> i)
-                        .reduce(Integer::sum)
-                        .setParallelism(upstreamParallelism)
-                        .cache();
-        cachedStream.print();
-
-        
Assertions.assertThatThrownBy(env::getStreamGraph).isInstanceOf(RuntimeException.class);
-    }
-
     @Test
     public void testCacheTransformation() {
         final TestingStreamExecutionEnvironment env = new 
TestingStreamExecutionEnvironment();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
index 935c715aabd..9daffc20b35 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.CachedDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -40,15 +41,12 @@ import 
org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -66,9 +64,7 @@ import java.util.UUID;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test datastream cache. */
-@Disabled
 public class CacheITCase extends AbstractTestBase {
-
     private StreamExecutionEnvironment env;
     private MiniClusterWithClientResource miniClusterWithClientResource;
 
@@ -110,17 +106,8 @@ public class CacheITCase extends AbstractTestBase {
                         .map(i -> Integer.parseInt(i) + 1)
                         .cache();
 
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(2, 3, 4);
-        }
-
-        assertThat(file.delete()).isTrue();
-
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(2, 3, 4);
-        }
+        executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4");
+        executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4");
     }
 
     @Test
@@ -138,30 +125,17 @@ public class CacheITCase extends AbstractTestBase {
                         .map(i -> Integer.parseInt(i) + 1)
                         .cache();
 
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(2, 3, 4);
-        }
-
-        assertThat(file.delete()).isTrue();
-
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(2, 3, 4);
-        }
-
+        executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4");
         cachedDataStream.invalidate();
 
         // overwrite the content of the source file
+        assertThat(file.delete()).isTrue();
         try (FileWriter writer = new FileWriter(file)) {
             writer.write("4\n5\n6\n");
         }
 
         // after cache is invalidated it should re-read from source file with 
the updated content
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(5, 6, 7);
-        }
+        executeAndVerifyResult(tmpDir, cachedDataStream, "5", "6", "7");
     }
 
     @Test
@@ -180,23 +154,16 @@ public class CacheITCase extends AbstractTestBase {
                         .map(i -> i + 1)
                         .cache();
 
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(2, 3, 4);
-        }
-
-        assertThat(file.delete()).isTrue();
+        executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4");
 
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-        try (CloseableIterator<Integer> resultIterator =
-                cachedDataStream.map(i -> i + 1).executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(3, 4, 5);
-        }
+        final SingleOutputStreamOperator<Integer> dataStream = 
cachedDataStream.map(i -> i + 1);
+        executeAndVerifyResult(tmpDir, dataStream, "3", "4", "5");
     }
 
     @Test
-    void testCacheProduceAndConsumeWithDifferentPartitioner() throws Exception 
{
+    void testCacheProduceAndConsumeWithDifferentPartitioner(@TempDir 
java.nio.file.Path tmpDir)
+            throws Exception {
 
         final DataStreamSource<Tuple2<Integer, Integer>> ds =
                 env.fromElements(new Tuple2<>(1, 1), new Tuple2<>(2, 1), new 
Tuple2<>(2, 1));
@@ -205,24 +172,16 @@ public class CacheITCase extends AbstractTestBase {
         SingleOutputStreamOperator<Tuple2<Integer, Integer>> result =
                 cacheSource.keyBy(v -> v.f0).reduce((v1, v2) -> new 
Tuple2<>(v1.f0, v1.f1 + v2.f1));
 
-        try (CloseableIterator<Tuple2<Integer, Integer>> resultIterator =
-                result.executeAndCollect()) {
-            List<Tuple2<Integer, Integer>> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(new Tuple2<>(1, 1), 
new Tuple2<>(2, 2));
-        }
+        executeAndVerifyResult(tmpDir, result, "(1,1)", "(2,2)");
 
         result =
                 cacheSource.keyBy(t -> t.f1).reduce((v1, v2) -> new 
Tuple2<>(v1.f0 + v2.f0, v1.f1));
 
-        try (CloseableIterator<Tuple2<Integer, Integer>> resultIterator =
-                result.executeAndCollect()) {
-            List<Tuple2<Integer, Integer>> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(new Tuple2<>(5, 1));
-        }
+        executeAndVerifyResult(tmpDir, result, "(5,1)");
     }
 
     @Test
-    void testCacheSideOutput() throws Exception {
+    void testCacheSideOutput(@TempDir java.nio.file.Path tmpDir) throws 
Exception {
         OutputTag<Integer> tag = new OutputTag<Integer>("2") {};
         final DataStreamSource<Tuple2<Integer, Integer>> ds =
                 env.fromElements(new Tuple2<>(1, 1), new Tuple2<>(2, 1), new 
Tuple2<>(2, 2));
@@ -244,16 +203,9 @@ public class CacheITCase extends AbstractTestBase {
                         });
 
         final CachedDataStream<Integer> cachedSideOutput = 
processed.getSideOutput(tag).cache();
+        executeAndVerifyResult(tmpDir, cachedSideOutput, "1", "2");
 
-        try (CloseableIterator<Integer> resultIterator = 
cachedSideOutput.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(1, 2);
-        }
-
-        try (CloseableIterator<Integer> resultIterator = 
cachedSideOutput.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(1, 2);
-        }
+        executeAndVerifyResult(tmpDir, cachedSideOutput, "1", "2");
     }
 
     @Test
@@ -270,46 +222,53 @@ public class CacheITCase extends AbstractTestBase {
                         .map(i -> Integer.parseInt(i) + 1)
                         .cache();
 
-        try (CloseableIterator<Integer> resultIterator = 
cachedDataStream.executeAndCollect()) {
-            List<Integer> results = 
CollectionUtil.iteratorToList(resultIterator);
-            assertThat(results).containsExactlyInAnyOrder(2, 3, 4);
-        }
+        executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4");
 
         final AbstractID datasetId =
                 ((CacheTransformation<Integer>) 
cachedDataStream.getTransformation())
                         .getDatasetId();
 
-        assertThat(file.delete()).isTrue();
         // overwrite the content of the source file
+        assertThat(file.delete()).isTrue();
         try (FileWriter writer = new FileWriter(file)) {
             writer.write("4\n5\n6\n");
         }
 
-        final File outputFile = new File(tmpDir.toFile(), 
UUID.randomUUID().toString());
-        cachedDataStream
-                .flatMap(
-                        (FlatMapFunction<Integer, Integer>)
-                                (value, out) -> {
-                                    if (value < 5) {
-                                        // Simulate 
ClusterDatasetCorruptedException.
-                                        throw new 
ClusterDatasetCorruptedException(
-                                                null,
-                                                Collections.singletonList(
-                                                        new 
IntermediateDataSetID(datasetId)));
-                                    }
-                                    out.collect(value);
-                                })
-                .returns(Integer.class)
-                .sinkTo(
-                        FileSink.forRowFormat(
-                                        new 
org.apache.flink.core.fs.Path(outputFile.getPath()),
-                                        new SimpleStringEncoder<Integer>())
-                                .build());
+        final SingleOutputStreamOperator<Integer> dataStream =
+                cachedDataStream
+                        .flatMap(
+                                (FlatMapFunction<Integer, Integer>)
+                                        (value, out) -> {
+                                            if (value < 5) {
+                                                // Simulate 
ClusterDatasetCorruptedException.
+                                                throw new 
ClusterDatasetCorruptedException(
+                                                        null,
+                                                        
Collections.singletonList(
+                                                                new 
IntermediateDataSetID(
+                                                                        
datasetId)));
+                                            }
+                                            out.collect(value);
+                                        })
+                        .returns(Integer.class);
+        executeAndVerifyResult(tmpDir, dataStream, "5", "6", "7");
+    }
+
+    private <T> void executeAndVerifyResult(
+            Path tmpDir, DataStream<T> dataStream, String... expectedResult) 
throws Exception {
+        File outputFile = new File(tmpDir.toFile(), 
UUID.randomUUID().toString());
+        dataStream.sinkTo(getFileSink(outputFile));
         env.execute();
-        assertThat(getFileContext(outputFile)).containsExactlyInAnyOrder("5", 
"6", "7");
+        
assertThat(getFileContent(outputFile)).containsExactlyInAnyOrder(expectedResult);
+    }
+
+    private <T> FileSink<T> getFileSink(File outputFile) {
+        return FileSink.forRowFormat(
+                        new 
org.apache.flink.core.fs.Path(outputFile.getPath()),
+                        new SimpleStringEncoder<T>())
+                .build();
     }
 
-    private static List<String> getFileContext(File directory) throws 
IOException {
+    private static List<String> getFileContent(File directory) throws 
IOException {
         List<String> res = new ArrayList<>();
 
         final Collection<File> filesInBucket = FileUtils.listFiles(directory, 
null, true);

Reply via email to