[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream

2022-08-02 Thread GitBox


gaoyunhaii commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r935436250


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##
@@ -880,6 +889,125 @@ public void 
testAutoParallelismForExpandedTransformations() {
 });
 }
 
+@Test(expected = RuntimeException.class)
+public void testCacheInStreamModeThrowException() {

Review Comment:
   nit: Throw -> Throws



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SideOutputDataStream.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
+
+/**
+ * A {@link SideOutputDataStream} represents a {@link DataStream} that 
contains elements that are
+ * emitted from upstream into a side output with some tag.
+ *
+ * @param  The type of the elements in this stream.
+ */
+@Public
+public class SideOutputDataStream extends DataStream {
+/**
+ * Create a new {@link SideOutputDataStream} in the given execution 
environment.

Review Comment:
   nit: Create -> Creates



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java:
##
@@ -437,4 +437,22 @@ public SingleOutputStreamOperator setDescription(String 
description) {
 transformation.setDescription(description);
 return this;
 }
+
+/**
+ * Cache the intermediate result of the transformation. Only support 
bounded streams and
+ * currently only block mode is supported. The cache is generated lazily 
at the first time the
+ * intermediate result is computed. The cache will be clear when {@link
+ * CachedDataStream#invalidate()} called or the {@link 
StreamExecutionEnvironment} close.
+ *
+ * @return CachedDataStream that can use in later job to reuse the cached 
intermediate result.
+ */
+@PublicEvolving
+public CachedDataStream cache() {
+if (!(this.transformation instanceof PhysicalTransformation)) {
+throw new IllegalStateException(
+"Cache can only be called with physical transformation");

Review Comment:
   The message here might still be "... with physical or side output 
transformation"



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the 
intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it 
consumes the cached
+ * intermediate result in later jobs. In st

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream

2022-07-26 Thread GitBox


gaoyunhaii commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r929639303


##
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##
@@ -184,4 +188,24 @@ CompletableFuture triggerSavepoint(
  */
 CompletableFuture sendCoordinationRequest(
 JobID jobId, OperatorID operatorId, CoordinationRequest request);
+
+/**
+ * Return a set of ids of the completely cached intermediate dataset.
+ *
+ * @return A set of ids of the completely cached intermediate dataset.
+ */
+default CompletableFuture> listCachedIntermediateDataset() 
{

Review Comment:
   Might be renamed to `listClusterDatasets`.



##
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##
@@ -184,4 +188,24 @@ CompletableFuture triggerSavepoint(
  */
 CompletableFuture sendCoordinationRequest(
 JobID jobId, OperatorID operatorId, CoordinationRequest request);
+
+/**
+ * Return a set of ids of the completely cached intermediate dataset.
+ *
+ * @return A set of ids of the completely cached intermediate dataset.
+ */
+default CompletableFuture> listCachedIntermediateDataset() 
{
+return CompletableFuture.completedFuture(Collections.emptySet());
+}
+
+/**
+ * Invalidate the cached intermediate dataset with the given id.
+ *
+ * @param intermediateDataSetId id of the cached intermediate dataset to 
be invalidated.
+ * @return Future which will be completed when the cached dataset is 
invalidated.
+ */
+default CompletableFuture invalidateCachedIntermediateDataset(

Review Comment:
   Might be renamed to `invalidateClusterDataset(AbstractId datasetId)`



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -2203,13 +2197,24 @@ public StreamGraph getStreamGraph() {
  */
 @Internal
 public StreamGraph getStreamGraph(boolean clearTransformations) {
+updateCacheTransformation();

Review Comment:
   Might be renamed to `synchronizeClusterDatasetStatus`. 
   
   We should skip this check at least if `cachedTransformation.size() == 0`. If 
possible I think we should also skip the check if all the transformation has 
been marked as `cached`. 



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##
@@ -2591,4 +2596,70 @@ private > T 
getTypeInfo(
 public List> getTransformations() {
 return transformations;
 }
+
+@Internal
+public  void addCache(AbstractID intermediateDataSetID, 
CacheTransformation t) {

Review Comment:
   might be renamed to `registerCachedTransformation`?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##
@@ -611,14 +612,25 @@ public String getSlotSharingGroup(Integer id) {
 }
 
 public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, 
int typeNumber) {
+addEdge(upStreamVertexID, downStreamVertexID, typeNumber, null, false);
+}
+
+public void addEdge(
+Integer upStreamVertexID,
+Integer downStreamVertexID,
+int typeNumber,
+IntermediateDataSetID intermediateDataSetId,
+boolean shouldCache) {

Review Comment:
   `shouldCache` might be removed for all the methods regarding addEdge in 
`StreamGraph` and `StreamEdge`?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CachedDataStream.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+
+/**
+ * {@link CachedDataStream} represents a {@link DataStream} whose intermediate 
result will be cached
+ * at the first time when it is computed. And the cached i

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream

2022-07-20 Thread GitBox


gaoyunhaii commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r925041477


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the 
intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it 
consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param 
+ */
+@Internal
+public class CacheTransformation extends Transformation {
+private final Transformation transformationToCache;
+private final IntermediateDataSetID intermediateDataSetID;
+private boolean isCached;
+/**
+ * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
+ *
+ * @param name The name of the {@code Transformation}, this will be shown 
in Visualizations and
+ * the Log
+ * @param outputType The output type of this {@code Transformation}
+ * @param parallelism The parallelism of this {@code Transformation}
+ */
+public CacheTransformation(
+Transformation transformationToCache,
+String name,
+TypeInformation outputType,

Review Comment:
   Does the `CacheTransformation` always has the same outputType and 
parallelism with the given one? If so we might not allow users to reset the two 
parameters?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the 
intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it 
consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param 

Review Comment:
   nit: complete the type param comment or remove this line.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##
@@ -21,22 +21,26 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.ToIntFunction;
 
 /** Container for meta-data of a data s