[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream
gaoyunhaii commented on code in PR #20147: URL: https://github.com/apache/flink/pull/20147#discussion_r935436250 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java: ## @@ -880,6 +889,125 @@ public void testAutoParallelismForExpandedTransformations() { }); } +@Test(expected = RuntimeException.class) +public void testCacheInStreamModeThrowException() { Review Comment: nit: Throw -> Throws ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SideOutputDataStream.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SideOutputTransformation; + +/** + * A {@link SideOutputDataStream} represents a {@link DataStream} that contains elements that are + * emitted from upstream into a side output with some tag. + * + * @param The type of the elements in this stream. + */ +@Public +public class SideOutputDataStream extends DataStream { +/** + * Create a new {@link SideOutputDataStream} in the given execution environment. Review Comment: nit: Create -> Creates ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java: ## @@ -437,4 +437,22 @@ public SingleOutputStreamOperator setDescription(String description) { transformation.setDescription(description); return this; } + +/** + * Cache the intermediate result of the transformation. Only support bounded streams and + * currently only block mode is supported. The cache is generated lazily at the first time the + * intermediate result is computed. The cache will be clear when {@link + * CachedDataStream#invalidate()} called or the {@link StreamExecutionEnvironment} close. + * + * @return CachedDataStream that can use in later job to reuse the cached intermediate result. + */ +@PublicEvolving +public CachedDataStream cache() { +if (!(this.transformation instanceof PhysicalTransformation)) { +throw new IllegalStateException( +"Cache can only be called with physical transformation"); Review Comment: The message here might still be "... with physical or side output transformation" ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java: ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.util.AbstractID; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; + +/** + * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the + * upper stream should be cached when it is computed at the first time. And it consumes the cached + * intermediate result in later jobs. In st
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream
gaoyunhaii commented on code in PR #20147: URL: https://github.com/apache/flink/pull/20147#discussion_r929639303 ## flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java: ## @@ -184,4 +188,24 @@ CompletableFuture triggerSavepoint( */ CompletableFuture sendCoordinationRequest( JobID jobId, OperatorID operatorId, CoordinationRequest request); + +/** + * Return a set of ids of the completely cached intermediate dataset. + * + * @return A set of ids of the completely cached intermediate dataset. + */ +default CompletableFuture> listCachedIntermediateDataset() { Review Comment: Might be renamed to `listClusterDatasets`. ## flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java: ## @@ -184,4 +188,24 @@ CompletableFuture triggerSavepoint( */ CompletableFuture sendCoordinationRequest( JobID jobId, OperatorID operatorId, CoordinationRequest request); + +/** + * Return a set of ids of the completely cached intermediate dataset. + * + * @return A set of ids of the completely cached intermediate dataset. + */ +default CompletableFuture> listCachedIntermediateDataset() { +return CompletableFuture.completedFuture(Collections.emptySet()); +} + +/** + * Invalidate the cached intermediate dataset with the given id. + * + * @param intermediateDataSetId id of the cached intermediate dataset to be invalidated. + * @return Future which will be completed when the cached dataset is invalidated. + */ +default CompletableFuture invalidateCachedIntermediateDataset( Review Comment: Might be renamed to `invalidateClusterDataset(AbstractId datasetId)` ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -2203,13 +2197,24 @@ public StreamGraph getStreamGraph() { */ @Internal public StreamGraph getStreamGraph(boolean clearTransformations) { +updateCacheTransformation(); Review Comment: Might be renamed to `synchronizeClusterDatasetStatus`. We should skip this check at least if `cachedTransformation.size() == 0`. If possible I think we should also skip the check if all the transformation has been marked as `cached`. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java: ## @@ -2591,4 +2596,70 @@ private > T getTypeInfo( public List> getTransformations() { return transformations; } + +@Internal +public void addCache(AbstractID intermediateDataSetID, CacheTransformation t) { Review Comment: might be renamed to `registerCachedTransformation`? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -611,14 +612,25 @@ public String getSlotSharingGroup(Integer id) { } public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) { +addEdge(upStreamVertexID, downStreamVertexID, typeNumber, null, false); +} + +public void addEdge( +Integer upStreamVertexID, +Integer downStreamVertexID, +int typeNumber, +IntermediateDataSetID intermediateDataSetId, +boolean shouldCache) { Review Comment: `shouldCache` might be removed for all the methods regarding addEdge in `StreamGraph` and `StreamEdge`? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CachedDataStream.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.CacheTransformation; + +/** + * {@link CachedDataStream} represents a {@link DataStream} whose intermediate result will be cached + * at the first time when it is computed. And the cached i
[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream
gaoyunhaii commented on code in PR #20147: URL: https://github.com/apache/flink/pull/20147#discussion_r925041477 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; + +/** + * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the + * upper stream should be cached when it is computed at the first time. And it consumes the cached + * intermediate result in later jobs. In stream mode, it has no affect. + * + * @param + */ +@Internal +public class CacheTransformation extends Transformation { +private final Transformation transformationToCache; +private final IntermediateDataSetID intermediateDataSetID; +private boolean isCached; +/** + * Creates a new {@code Transformation} with the given name, output type and parallelism. + * + * @param name The name of the {@code Transformation}, this will be shown in Visualizations and + * the Log + * @param outputType The output type of this {@code Transformation} + * @param parallelism The parallelism of this {@code Transformation} + */ +public CacheTransformation( +Transformation transformationToCache, +String name, +TypeInformation outputType, Review Comment: Does the `CacheTransformation` always has the same outputType and parallelism with the given one? If so we might not allow users to reset the two parameters? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; + +/** + * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the + * upper stream should be cached when it is computed at the first time. And it consumes the cached + * intermediate result in later jobs. In stream mode, it has no affect. + * + * @param Review Comment: nit: complete the type param comment or remove this line. ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java: ## @@ -21,22 +21,26 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.util.Preconditions; +import java.io.Serializable; import java.util.Comparator; import java.util.Map; import java.util.OptionalInt; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.ToIntFunction; /** Container for meta-data of a data s