This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ada9199f19220ebc4b72eb2227ffa0a8ac8842a Author: sxnan <suxuanna...@gmail.com> AuthorDate: Tue Aug 9 18:08:19 2022 +0800 [FLINK-28860][datastream] Cache consumption in stream mode recompute result in case of cache miss --- .../translators/CacheTransformationTranslator.java | 6 +- .../api/graph/StreamGraphGeneratorTest.java | 17 --- .../flink/test/streaming/runtime/CacheITCase.java | 141 ++++++++------------- 3 files changed, 54 insertions(+), 110 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java index c55998ab7fd..cfd958d5ec9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java @@ -72,8 +72,10 @@ public class CacheTransformationTranslator<OUT, T extends CacheTransformation<OU if (transformation.isCached()) { return consumeCache(transformation, context); } else { - throw new RuntimeException( - "Producing cache IntermediateResult is not supported in streaming mode"); + final List<Transformation<?>> inputs = transformation.getInputs(); + Preconditions.checkState( + inputs.size() == 1, "There could be only one transformation input to cache"); + return context.getStreamNodeIds(inputs.get(0)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 3e52c036f7a..13495a0d933 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -907,23 +907,6 @@ public class StreamGraphGeneratorTest extends TestLogger { }); } - @Test - public void testCacheInStreamModeThrowsException() { - final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment(); - env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - - DataStream<Integer> source = env.fromElements(1, 2, 3); - final int upstreamParallelism = 3; - CachedDataStream<Integer> cachedStream = - source.keyBy(i -> i) - .reduce(Integer::sum) - .setParallelism(upstreamParallelism) - .cache(); - cachedStream.print(); - - Assertions.assertThatThrownBy(env::getStreamGraph).isInstanceOf(RuntimeException.class); - } - @Test public void testCacheTransformation() { final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java index 935c715aabd..9daffc20b35 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.CachedDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -40,15 +41,12 @@ import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.AbstractID; -import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.commons.io.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -66,9 +64,7 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; /** Test datastream cache. */ -@Disabled public class CacheITCase extends AbstractTestBase { - private StreamExecutionEnvironment env; private MiniClusterWithClientResource miniClusterWithClientResource; @@ -110,17 +106,8 @@ public class CacheITCase extends AbstractTestBase { .map(i -> Integer.parseInt(i) + 1) .cache(); - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(2, 3, 4); - } - - assertThat(file.delete()).isTrue(); - - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(2, 3, 4); - } + executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4"); + executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4"); } @Test @@ -138,30 +125,17 @@ public class CacheITCase extends AbstractTestBase { .map(i -> Integer.parseInt(i) + 1) .cache(); - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(2, 3, 4); - } - - assertThat(file.delete()).isTrue(); - - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(2, 3, 4); - } - + executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4"); cachedDataStream.invalidate(); // overwrite the content of the source file + assertThat(file.delete()).isTrue(); try (FileWriter writer = new FileWriter(file)) { writer.write("4\n5\n6\n"); } // after cache is invalidated it should re-read from source file with the updated content - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(5, 6, 7); - } + executeAndVerifyResult(tmpDir, cachedDataStream, "5", "6", "7"); } @Test @@ -180,23 +154,16 @@ public class CacheITCase extends AbstractTestBase { .map(i -> i + 1) .cache(); - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(2, 3, 4); - } - - assertThat(file.delete()).isTrue(); + executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4"); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - try (CloseableIterator<Integer> resultIterator = - cachedDataStream.map(i -> i + 1).executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(3, 4, 5); - } + final SingleOutputStreamOperator<Integer> dataStream = cachedDataStream.map(i -> i + 1); + executeAndVerifyResult(tmpDir, dataStream, "3", "4", "5"); } @Test - void testCacheProduceAndConsumeWithDifferentPartitioner() throws Exception { + void testCacheProduceAndConsumeWithDifferentPartitioner(@TempDir java.nio.file.Path tmpDir) + throws Exception { final DataStreamSource<Tuple2<Integer, Integer>> ds = env.fromElements(new Tuple2<>(1, 1), new Tuple2<>(2, 1), new Tuple2<>(2, 1)); @@ -205,24 +172,16 @@ public class CacheITCase extends AbstractTestBase { SingleOutputStreamOperator<Tuple2<Integer, Integer>> result = cacheSource.keyBy(v -> v.f0).reduce((v1, v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1)); - try (CloseableIterator<Tuple2<Integer, Integer>> resultIterator = - result.executeAndCollect()) { - List<Tuple2<Integer, Integer>> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(new Tuple2<>(1, 1), new Tuple2<>(2, 2)); - } + executeAndVerifyResult(tmpDir, result, "(1,1)", "(2,2)"); result = cacheSource.keyBy(t -> t.f1).reduce((v1, v2) -> new Tuple2<>(v1.f0 + v2.f0, v1.f1)); - try (CloseableIterator<Tuple2<Integer, Integer>> resultIterator = - result.executeAndCollect()) { - List<Tuple2<Integer, Integer>> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(new Tuple2<>(5, 1)); - } + executeAndVerifyResult(tmpDir, result, "(5,1)"); } @Test - void testCacheSideOutput() throws Exception { + void testCacheSideOutput(@TempDir java.nio.file.Path tmpDir) throws Exception { OutputTag<Integer> tag = new OutputTag<Integer>("2") {}; final DataStreamSource<Tuple2<Integer, Integer>> ds = env.fromElements(new Tuple2<>(1, 1), new Tuple2<>(2, 1), new Tuple2<>(2, 2)); @@ -244,16 +203,9 @@ public class CacheITCase extends AbstractTestBase { }); final CachedDataStream<Integer> cachedSideOutput = processed.getSideOutput(tag).cache(); + executeAndVerifyResult(tmpDir, cachedSideOutput, "1", "2"); - try (CloseableIterator<Integer> resultIterator = cachedSideOutput.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(1, 2); - } - - try (CloseableIterator<Integer> resultIterator = cachedSideOutput.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(1, 2); - } + executeAndVerifyResult(tmpDir, cachedSideOutput, "1", "2"); } @Test @@ -270,46 +222,53 @@ public class CacheITCase extends AbstractTestBase { .map(i -> Integer.parseInt(i) + 1) .cache(); - try (CloseableIterator<Integer> resultIterator = cachedDataStream.executeAndCollect()) { - List<Integer> results = CollectionUtil.iteratorToList(resultIterator); - assertThat(results).containsExactlyInAnyOrder(2, 3, 4); - } + executeAndVerifyResult(tmpDir, cachedDataStream, "2", "3", "4"); final AbstractID datasetId = ((CacheTransformation<Integer>) cachedDataStream.getTransformation()) .getDatasetId(); - assertThat(file.delete()).isTrue(); // overwrite the content of the source file + assertThat(file.delete()).isTrue(); try (FileWriter writer = new FileWriter(file)) { writer.write("4\n5\n6\n"); } - final File outputFile = new File(tmpDir.toFile(), UUID.randomUUID().toString()); - cachedDataStream - .flatMap( - (FlatMapFunction<Integer, Integer>) - (value, out) -> { - if (value < 5) { - // Simulate ClusterDatasetCorruptedException. - throw new ClusterDatasetCorruptedException( - null, - Collections.singletonList( - new IntermediateDataSetID(datasetId))); - } - out.collect(value); - }) - .returns(Integer.class) - .sinkTo( - FileSink.forRowFormat( - new org.apache.flink.core.fs.Path(outputFile.getPath()), - new SimpleStringEncoder<Integer>()) - .build()); + final SingleOutputStreamOperator<Integer> dataStream = + cachedDataStream + .flatMap( + (FlatMapFunction<Integer, Integer>) + (value, out) -> { + if (value < 5) { + // Simulate ClusterDatasetCorruptedException. + throw new ClusterDatasetCorruptedException( + null, + Collections.singletonList( + new IntermediateDataSetID( + datasetId))); + } + out.collect(value); + }) + .returns(Integer.class); + executeAndVerifyResult(tmpDir, dataStream, "5", "6", "7"); + } + + private <T> void executeAndVerifyResult( + Path tmpDir, DataStream<T> dataStream, String... expectedResult) throws Exception { + File outputFile = new File(tmpDir.toFile(), UUID.randomUUID().toString()); + dataStream.sinkTo(getFileSink(outputFile)); env.execute(); - assertThat(getFileContext(outputFile)).containsExactlyInAnyOrder("5", "6", "7"); + assertThat(getFileContent(outputFile)).containsExactlyInAnyOrder(expectedResult); + } + + private <T> FileSink<T> getFileSink(File outputFile) { + return FileSink.forRowFormat( + new org.apache.flink.core.fs.Path(outputFile.getPath()), + new SimpleStringEncoder<T>()) + .build(); } - private static List<String> getFileContext(File directory) throws IOException { + private static List<String> getFileContent(File directory) throws IOException { List<String> res = new ArrayList<>(); final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true);