reswqa commented on code in PR #25276:
URL: https://github.com/apache/flink/pull/25276#discussion_r1740311042


##########
flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.attribute;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/** {@link Attribute} contains the information about the process logic of a 
process function. */
+@Internal
+public class Attribute implements Serializable {

Review Comment:
   This is not a `Public` API, we should move it to `flink-core`.



##########
flink-core-api/src/main/java/org/apache/flink/api/common/attribute/Attribute.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.attribute;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/** {@link Attribute} contains the information about the process logic of a 
process function. */
+@Internal
+public class Attribute implements Serializable {
+
+    private boolean isNoOutputUntilEndOfInput;
+
+    private Attribute(Builder builder) {

Review Comment:
   I don't think pass a builder to ctr is a good pattern.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithOperatorAttributesTest.java:
##########
@@ -97,10 +98,46 @@ void testOutputOnlyAfterEndOfStream() {
         assertThat(vertexMap.get("Source: 
source").isAnyOutputBlocking()).isFalse();
         assertThat(vertexMap.get("transform -> 
Map").isAnyOutputBlocking()).isTrue();
         assertThat(vertexMap.get("sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testOutputOnlyAfterEndOfStreamCase2() {

Review Comment:
   Can we merge this to the test case above and remove the `Case1` suffix?



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception {

Review Comment:
   `testPropagationAlongOperatorChain`



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java:
##########
@@ -206,7 +205,7 @@ public <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> 
process(
     }
 
     @Override
-    public <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
+    public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<V, 
OUT1, OUT2> process(

Review Comment:
   We should do the same fix also for `TwoKeyedPartitionStreams`.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -697,20 +696,20 @@ private List<StreamEdge> createChain(
             }
 
             for (StreamEdge chainable : chainableOutputs) {
-                // Mark downstream nodes in the same chain as outputBlocking
-                if (isOutputOnlyAfterEndOfStream) {
-                    outputBlockingNodesID.add(chainable.getTargetId());
+                // Only modify the attribute of downstream nodes in the same 
chain.
+                if (isNoOutputUntilEndOfInput) {
+                    StreamNode targetNode = 
streamGraph.getStreamNode(chainable.getTargetId());
+                    Attribute targetNodeAttribute = targetNode.getAttribute();
+                    if (targetNodeAttribute != null) {
+                        targetNodeAttribute.setNoOutputUntilEndOfInput(true);
+                    }
                 }
                 transitiveOutEdges.addAll(
                         createChain(
                                 chainable.getTargetId(),
                                 chainIndex + 1,
                                 chainInfo,
                                 chainEntryPoints));
-                // Mark upstream nodes in the same chain as outputBlocking
-                if (outputBlockingNodesID.contains(chainable.getTargetId())) {

Review Comment:
   We shouldn't remove it, instead, we should adapt it. 
   
   We also lack tests for this case.
   
   



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableTwoNonKeyedPartitionStreamImpl.java:
##########
@@ -19,8 +19,8 @@
 package org.apache.flink.datastream.impl.stream;
 
 import org.apache.flink.api.dag.Transformation;
-import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
+import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;

Review Comment:
   Why we have this change in commit2(this seems should be put at commit1)?



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java:
##########
@@ -191,17 +204,17 @@ private static class TwoGlobalStreamsImpl<OUT1, OUT2> 
implements TwoGlobalStream
 
         private final GlobalStreamImpl<OUT2> secondStream;
 
-        public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of(
-                GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> 
secondStream) {
-            return new TwoGlobalStreamsImpl<>(firstStream, secondStream);
-        }
-
         private TwoGlobalStreamsImpl(
                 GlobalStreamImpl<OUT1> firstStream, GlobalStreamImpl<OUT2> 
secondStream) {
             this.firstStream = firstStream;
             this.secondStream = secondStream;
         }
 
+        public static <OUT1, OUT2> TwoGlobalStreamsImpl<OUT1, OUT2> of(

Review Comment:
   What's the difference between this and Line 194 which you have removed?



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))

Review Comment:
   Can we replace this with `DiscardSink`?



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java:
##########
@@ -282,7 +286,11 @@ public <T_OTHER, OUT> 
ProcessConfigurableAndKeyedPartitionStream<K, OUT> connect
                         processFunction,
                         getType(),
                         ((KeyedPartitionStreamImpl<K, T_OTHER>) 
other).getType());
-
+        other =

Review Comment:
   This is not the right line. We should move this after line 283.



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {

Review Comment:
   This can be merged with the next test case. And we can change it to test `A 
-> B -> C` while B is `NoOutputUntilEndOfInput`.



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception {

Review Comment:
   testTwoOutput



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<
+                        Integer, Integer, Integer>
+                twoOutputStream =
+                        source.process(new TestMapTask2())
+                                .withParallelism(2)
+                                .process(new TestTwoOutputProcessFunction())
+                                .withParallelism(2);
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> 
firstStream =
+                twoOutputStream.getFirst();
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer>
+                secondStream = twoOutputStream.getSecond();
+        firstStream
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        secondStream
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process -> Two-Output-Operator -> (Process, 
Process)"),
+                ResultPartitionType.BLOCKING);
+        assertThat(
+                        vertexMap
+                                .get("Process -> Two-Output-Operator -> 
(Process, Process)")
+                                .isAnyOutputBlocking())
+                .isTrue();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false);
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(4);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        
assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    private void assertHasOutputPartitionType(
+            JobVertex jobVertex, ResultPartitionType partitionType) {
+        
assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
+    }
+
+    private void assertManagedMemoryWeightsSize(StreamNode node, int 
weightSize) {
+        
assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize);
+    }
+
+    @NoOutputUntilEndOfInput
+    private static class TestMapTask1 implements 
OneInputStreamProcessFunction<Integer, Integer> {

Review Comment:
   Rename this to `NoOutputUntilEndOfInputMapTask`



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);

Review Comment:
   These don't make sense and can be removed.



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<
+                        Integer, Integer, Integer>
+                twoOutputStream =
+                        source.process(new TestMapTask2())
+                                .withParallelism(2)
+                                .process(new TestTwoOutputProcessFunction())
+                                .withParallelism(2);
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> 
firstStream =
+                twoOutputStream.getFirst();
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer>
+                secondStream = twoOutputStream.getSecond();
+        firstStream
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        secondStream
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process -> Two-Output-Operator -> (Process, 
Process)"),
+                ResultPartitionType.BLOCKING);
+        assertThat(
+                        vertexMap
+                                .get("Process -> Two-Output-Operator -> 
(Process, Process)")
+                                .isAnyOutputBlocking())
+                .isTrue();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        env.getConfiguration().set(PipelineOptions.OPERATOR_CHAINING, false);
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(4);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(vertexMap.get("KeyedProcess"), 
ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        
assertThat(vertexMap.get("KeyedProcess").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Process").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    private void assertHasOutputPartitionType(
+            JobVertex jobVertex, ResultPartitionType partitionType) {
+        
assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
+    }
+
+    private void assertManagedMemoryWeightsSize(StreamNode node, int 
weightSize) {
+        
assertThat(node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize);
+    }
+
+    @NoOutputUntilEndOfInput
+    private static class TestMapTask1 implements 
OneInputStreamProcessFunction<Integer, Integer> {
+
+        @Override
+        public void processRecord(
+                Integer record, Collector<Integer> output, PartitionedContext 
ctx) {
+            output.collect(record + 1);
+        }
+    }
+
+    private static class TestMapTask2 implements 
OneInputStreamProcessFunction<Integer, Integer> {

Review Comment:
   Rename to `TestMapTask`



##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.attribute;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.attribute.NoOutputUntilEndOfInput;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link StreamingJobGraphGenerator} with different attributes. */
+class StreamingJobGraphGeneratorWithAttributeTest {
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase1() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 1);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase2() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        source.keyBy(x -> x)
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<String, StreamNode> nodeMap = new HashMap<>();
+        for (StreamNode node : streamGraph.getStreamNodes()) {
+            nodeMap.put(node.getOperatorName(), node);
+        }
+        assertThat(nodeMap).hasSize(4);
+        assertManagedMemoryWeightsSize(nodeMap.get("Source: Collection 
Source"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("KeyedProcess"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Process"), 0);
+        assertManagedMemoryWeightsSize(nodeMap.get("Sink: Writer"), 0);
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Source: Collection Source"), 
ResultPartitionType.PIPELINED_BOUNDED);
+        assertHasOutputPartitionType(
+                vertexMap.get("KeyedProcess -> Process"), 
ResultPartitionType.BLOCKING);
+        assertThat(vertexMap.get("Source: Collection 
Source").isAnyOutputBlocking()).isFalse();
+        assertThat(vertexMap.get("KeyedProcess -> 
Process").isAnyOutputBlocking()).isTrue();
+        assertThat(vertexMap.get("Sink: 
Writer").isAnyOutputBlocking()).isFalse();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithOperatorChainCase3() throws Exception {
+        ExecutionEnvironmentImpl env =
+                (ExecutionEnvironmentImpl) ExecutionEnvironment.getInstance();
+        NonKeyedPartitionStream<Integer> source =
+                env.fromSource(
+                        DataStreamV2SourceUtils.fromData(Arrays.asList(1, 2, 
3)), "test-source");
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<
+                        Integer, Integer, Integer>
+                twoOutputStream =
+                        source.process(new TestMapTask2())
+                                .withParallelism(2)
+                                .process(new TestTwoOutputProcessFunction())
+                                .withParallelism(2);
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer> 
firstStream =
+                twoOutputStream.getFirst();
+        
NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream<Integer>
+                secondStream = twoOutputStream.getSecond();
+        firstStream
+                .process(new TestMapTask1())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        secondStream
+                .process(new TestMapTask2())
+                .withParallelism(2)
+                .toSink(new WrappedSink<>(new PrintSink()))
+                .withParallelism(3);
+        StreamGraph streamGraph = env.getStreamGraph();
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        Map<String, JobVertex> vertexMap = new HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            vertexMap.put(vertex.getName(), vertex);
+        }
+        assertThat(vertexMap).hasSize(3);
+        assertHasOutputPartitionType(
+                vertexMap.get("Process -> Two-Output-Operator -> (Process, 
Process)"),
+                ResultPartitionType.BLOCKING);
+        assertThat(
+                        vertexMap
+                                .get("Process -> Two-Output-Operator -> 
(Process, Process)")
+                                .isAnyOutputBlocking())
+                .isTrue();
+    }
+
+    @Test
+    void testNoOutputUntilEndOfInputWithoutOperatorChain() throws Exception {

Review Comment:
   testWithoutOperatorChain



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to