pnowojski commented on a change in pull request #13529:
URL: https://github.com/apache/flink/pull/13529#discussion_r501001585
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -260,20 +259,6 @@ private boolean allStreamStatusesAreIdle() {
return true;
}
- private static class SourceInputProcessor<T> extends
StreamOneInputProcessor<T> {
Review comment:
Nit, change commit title to:
> [hotfix] Remove SourceInputProcessor in StreamMultipleInputProcessor
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A factory for {@link StreamMultipleInputProcessor}.
+ */
+@Internal
+public class StreamMultipleInputProcessorFactory {
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static StreamMultipleInputProcessor create(
+ CheckpointedInputGate[] checkpointedInputGates,
+ StreamConfig.InputConfig[] configuredInputs,
+ IOManager ioManager,
+ TaskIOMetricGroup ioMetricGroup,
+ Counter mainOperatorRecordsIn,
+ StreamStatusMaintainer streamStatusMaintainer,
+ MultipleInputStreamOperator<?> mainOperator,
+ MultipleInputSelectionHandler inputSelectionHandler,
+ WatermarkGauge[] inputWatermarkGauges,
+ OperatorChain<?, ?> operatorChain) {
+ checkNotNull(operatorChain);
+ checkNotNull(inputSelectionHandler);
+
+ List<Input> operatorInputs = mainOperator.getInputs();
+ int inputsCount = operatorInputs.size();
+
+ StreamOneInputProcessor<?>[] inputProcessors = new
StreamOneInputProcessor[inputsCount];
+ Counter networkRecordsIn = new SimpleCounter();
+ ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
+
+ MultiStreamStreamStatusTracker streamStatusTracker = new
MultiStreamStreamStatusTracker(inputsCount);
+ checkState(
+ configuredInputs.length == inputsCount,
+ "Number of configured inputs in StreamConfig [%s]
doesn't match the main operator's number of inputs [%s]",
+ configuredInputs.length,
+ inputsCount);
+ for (int i = 0; i < inputsCount; i++) {
+ StreamConfig.InputConfig configuredInput =
configuredInputs[i];
+ if (configuredInput instanceof
StreamConfig.NetworkInputConfig) {
+ StreamConfig.NetworkInputConfig networkInput =
(StreamConfig.NetworkInputConfig) configuredInput;
+ StreamTaskNetworkOutput dataOutput = new
StreamTaskNetworkOutput<>(
+ operatorInputs.get(i),
+ streamStatusMaintainer,
+ inputWatermarkGauges[i],
+ inputSelectionHandler,
streamStatusTracker,
+ i,
+ mainOperatorRecordsIn,
+ networkRecordsIn);
+
+ inputProcessors[i] = new
StreamOneInputProcessor(
+ new StreamTaskNetworkInput<>(
+
checkpointedInputGates[networkInput.getInputGateIndex()],
+
networkInput.getTypeSerializer(),
+ ioManager,
+ new
StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
+ i),
+ dataOutput,
+ operatorChain);
+ }
+ else if (configuredInput instanceof
StreamConfig.SourceInputConfig) {
+ StreamConfig.SourceInputConfig sourceInput =
(StreamConfig.SourceInputConfig) configuredInput;
+ Output<StreamRecord<?>> chainedSourceOutput =
operatorChain.getChainedSourceOutput(sourceInput);
+ StreamTaskSourceInput<?> sourceTaskInput =
operatorChain.getSourceTaskInput(sourceInput);
+
+ inputProcessors[i] = new
StreamOneInputProcessor(
+ sourceTaskInput,
+ new
StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer,
inputWatermarkGauges[i],
+ streamStatusTracker,
+ i),
+ operatorChain);
+ }
+ else {
+ throw new
UnsupportedOperationException("Unknown input type: " + configuredInput);
+ }
+ }
+
+ return new StreamMultipleInputProcessor(
+ inputSelectionHandler,
+ inputProcessors
+ );
+ }
+
+
+ /**
+ * Stream status tracker for the inputs. We need to keep track for
determining when
+ * to forward stream status changes downstream.
+ */
+ private static class MultiStreamStreamStatusTracker {
+ private final StreamStatus[] streamStatuses;
+
+ private MultiStreamStreamStatusTracker(int numberOfInputs) {
+ this.streamStatuses = new StreamStatus[numberOfInputs];
+ Arrays.fill(streamStatuses, StreamStatus.ACTIVE);
+ }
+
+ public void setStreamStatus(int index, StreamStatus
streamStatus) {
+ streamStatuses[index] = streamStatus;
+ }
+
+ public StreamStatus getStreamStatus(int index) {
+ return streamStatuses[index];
+ }
+
+ public boolean allStreamStatusesAreIdle() {
+ for (StreamStatus streamStatus : streamStatuses) {
+ if (streamStatus.isActive()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
Review comment:
Do I understand correctly, that previously this functionality was
hidden/implemented by non static accesses between
`StreamMultipleInputProcessor#StreamTaskNetworkOutput` (non static class) and
`StreamMultipleInputProcessor`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -171,6 +168,10 @@ private int selectNextReadingInputIndex() throws
IOException {
updateAvailability();
checkInputSelectionAgainstIsFinished();
+ if (inputSelectionHandler.isInputUnavailable(0) &&
inputSelectionHandler.isInputUnavailable(1)) {
+ fullCheckAndSetAvailable();
+ }
Review comment:
Again as explained above, `StreamTwoInputProcessor#updateAvailability()`
should do the trick (% the starvation check in L181)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/TwoInputSelectionHandler.java
##########
@@ -70,6 +70,10 @@ boolean areAllInputsSelected() {
return inputSelection.areAllInputsSelected();
}
+ boolean isInputUnavailable(int inputIndex) {
Review comment:
nit: I would flip the method to `isInputAvailable(...)` and use it with
`!` operator
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -124,6 +120,7 @@ private int selectFirstReadingInputIndex() throws
IOException {
}
private void checkFinished(InputStatus status) throws Exception {
+ updateAvailability();
Review comment:
why do we need this? The currently processed input should be updated via
the returned `InputStatus`, no need to check it. If the other input is
unavailable, and both will become unavailable, we would do the full check via
waiting on availability future and we update the inputs (via cheap
`isAproximatelyAvailable()`) once future is completed.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInputs.java
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.sort.ExternalSorter;
+import org.apache.flink.runtime.operators.sort.PushSorter;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.io.StreamTaskInput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MutableObjectIterator;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.PriorityQueue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An entry class for creating coupled, sorting inputs. The inputs are sorted
independently and afterwards
+ * the inputs are being merged in order. It is done, by reporting availability
only for the input which is
+ * current head for the sorted inputs.
+ */
+public class MultiInputSortingDataInputs<K> {
Review comment:
Is this class being used anywhere in this commit besides in tests?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/EndOfInputAware.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+
+/**
+ * An interface for {@link OperatorChain} to extract the feature of ending the
input.
+ *
+ * <p>It's purpose is mainly to make it easier to instantiate {@link
StreamOneInputProcessor} which needs to
+ * notify the chain that an input has ended.
+ */
+@Internal
+public interface EndOfInputAware {
+
+ /**
+ * Ends the main operator input specified by {@code inputId}).
+ *
+ * @param inputId the input ID starts from 1 which indicates the first
input.
+ */
+ void endMainOperatorInput(int inputId) throws Exception;
+}
Review comment:
1. name `MainOperator` doesn't fit here I think. I would rename it to
`endInput`, but...
2 why not use `BoundedMultiInput` interface instead? It's basically the same
thing 😅
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -23,16 +23,12 @@
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
Review comment:
re
> [hotfix] Fix availability handling in TwoInputStreamOperator
0. Can you explain a bit more what's the actual issue (I don't see what is
this commit fixing)?
1. Can you create a bug ticket in the jira about about this and change
commit from hotfix to [FLINK-XYZ]?
2. Definitely this commit is missing some test. If you will struggle to
reproduce (`StreamTwoInputProcessor` is tested on the task level in
`StreamTaskSelectiveReadingTest` and `TwoInputStreamTaskTest`. Note that
`TwoInputStreamTaskTestHarness` that is used there is in the process of being
deprecated and:
a) it would be just nice to implement the test based on
`StreamTaskMailboxTestHarness`. So far it's mostly used for multiple input
tasks tests, but it has been used in source tasks and one input tasks tests as
well (for example `StreamTaskTest#testNotifyCheckpointOnClosedOperator`, so it
should be easy to use for the two inputs as well.
b) mailbox version of the test harness might be much much better at
reproducing a problem that depends on timing as you can execute it step by step
3. it would be best actually to merge this commit in separate PR to verify
the performance impact of this bug fix alone (to narrow down suspects in case
of regression detected after merging)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A factory for {@link StreamMultipleInputProcessor}.
+ */
+@Internal
+public class StreamMultipleInputProcessorFactory {
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static StreamMultipleInputProcessor create(
+ CheckpointedInputGate[] checkpointedInputGates,
+ StreamConfig.InputConfig[] configuredInputs,
+ IOManager ioManager,
+ TaskIOMetricGroup ioMetricGroup,
+ Counter mainOperatorRecordsIn,
+ StreamStatusMaintainer streamStatusMaintainer,
+ MultipleInputStreamOperator<?> mainOperator,
+ MultipleInputSelectionHandler inputSelectionHandler,
+ WatermarkGauge[] inputWatermarkGauges,
+ OperatorChain<?, ?> operatorChain) {
+ checkNotNull(operatorChain);
+ checkNotNull(inputSelectionHandler);
+
+ List<Input> operatorInputs = mainOperator.getInputs();
+ int inputsCount = operatorInputs.size();
+
+ StreamOneInputProcessor<?>[] inputProcessors = new
StreamOneInputProcessor[inputsCount];
+ Counter networkRecordsIn = new SimpleCounter();
+ ioMetricGroup.reuseRecordsInputCounter(networkRecordsIn);
+
+ MultiStreamStreamStatusTracker streamStatusTracker = new
MultiStreamStreamStatusTracker(inputsCount);
+ checkState(
+ configuredInputs.length == inputsCount,
+ "Number of configured inputs in StreamConfig [%s]
doesn't match the main operator's number of inputs [%s]",
+ configuredInputs.length,
+ inputsCount);
+ for (int i = 0; i < inputsCount; i++) {
+ StreamConfig.InputConfig configuredInput =
configuredInputs[i];
+ if (configuredInput instanceof
StreamConfig.NetworkInputConfig) {
+ StreamConfig.NetworkInputConfig networkInput =
(StreamConfig.NetworkInputConfig) configuredInput;
+ StreamTaskNetworkOutput dataOutput = new
StreamTaskNetworkOutput<>(
+ operatorInputs.get(i),
+ streamStatusMaintainer,
+ inputWatermarkGauges[i],
+ inputSelectionHandler,
streamStatusTracker,
+ i,
+ mainOperatorRecordsIn,
+ networkRecordsIn);
+
+ inputProcessors[i] = new
StreamOneInputProcessor(
+ new StreamTaskNetworkInput<>(
+
checkpointedInputGates[networkInput.getInputGateIndex()],
+
networkInput.getTypeSerializer(),
+ ioManager,
+ new
StatusWatermarkValve(checkpointedInputGates[networkInput.getInputGateIndex()].getNumberOfInputChannels()),
+ i),
+ dataOutput,
+ operatorChain);
+ }
+ else if (configuredInput instanceof
StreamConfig.SourceInputConfig) {
+ StreamConfig.SourceInputConfig sourceInput =
(StreamConfig.SourceInputConfig) configuredInput;
+ Output<StreamRecord<?>> chainedSourceOutput =
operatorChain.getChainedSourceOutput(sourceInput);
+ StreamTaskSourceInput<?> sourceTaskInput =
operatorChain.getSourceTaskInput(sourceInput);
+
+ inputProcessors[i] = new
StreamOneInputProcessor(
+ sourceTaskInput,
+ new
StreamTaskSourceOutput(chainedSourceOutput, streamStatusMaintainer,
inputWatermarkGauges[i],
+ streamStatusTracker,
+ i),
Review comment:
nit: formatting
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/sort/SortedInputITCase.java
##########
@@ -52,13 +56,16 @@
import static org.junit.Assert.assertThat;
/**
- * Longer running IT tests for {@link SortingDataInputTest}. For quicker smoke
tests see {@link SortingDataInputTest}.
+ * Longer running IT tests for {@link SortingDataInput} and {@link
MultiInputSortingDataInputs}.
+ *
+ * @see SortingDataInputTest
+ * @see MultiInputSortingDataInputsTest
*/
-public class SortingDataInputITCase {
+public class SortedInputITCase {
Review comment:
Names are here a bit confusing. `Sorted` vs `Sorting` and "longer
running" is encoded as `ITCase` vs `Test`.
Maybe:
`SortedInputITCase` -> `LargeSortedInputITCase` (or
`LargeSortingInputITCase`)
Also maybe unify `SortingDataInput` vs `SortedInput`, or does it make sense
as it is?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
##########
@@ -40,38 +48,78 @@
*/
public class StreamTwoInputProcessorFactory {
public static <IN1, IN2> StreamTwoInputProcessor<IN1, IN2> create(
+ AbstractInvokable ownerTask,
CheckpointedInputGate[] checkpointedInputGates,
- TypeSerializer<IN1> inputSerializer1,
- TypeSerializer<IN2> inputSerializer2,
IOManager ioManager,
+ MemoryManager memoryManager,
TaskIOMetricGroup taskIOMetricGroup,
StreamStatusMaintainer streamStatusMaintainer,
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
TwoInputSelectionHandler inputSelectionHandler,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge,
OperatorChain<?, ?> operatorChain,
+ StreamConfig streamConfig,
+ Configuration taskManagerConfig,
+ Configuration jobConfig,
+ ExecutionConfig executionConfig,
+ ClassLoader userClassloader,
Counter numRecordsIn) {
checkNotNull(operatorChain);
checkNotNull(inputSelectionHandler);
+
StreamStatusTracker statusTracker = new StreamStatusTracker();
taskIOMetricGroup.reuseRecordsInputCounter(numRecordsIn);
+ TypeSerializer<IN1> typeSerializer1 =
streamConfig.getTypeSerializerIn(0, userClassloader);
+ StreamTaskInput<IN1> input1 = new StreamTaskNetworkInput<>(
+ checkpointedInputGates[0],
+ typeSerializer1,
+ ioManager,
+ new
StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()),
+ 0);
+ TypeSerializer<IN2> typeSerializer2 =
streamConfig.getTypeSerializerIn(1, userClassloader);
+ StreamTaskInput<IN2> input2 = new StreamTaskNetworkInput<>(
+ checkpointedInputGates[1],
+ typeSerializer2,
+ ioManager,
+ new
StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()),
+ 1);
+
+ if (streamConfig.shouldSortInputs()) {
+ @SuppressWarnings("unchecked")
+ MultiInputSortingDataInputs<Object> multiInputs = new
MultiInputSortingDataInputs<Object>(
Review comment:
(I remember someone complaining about unchecked warnings 🙈 and `Object`?
😈 )
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -67,7 +63,8 @@ public StreamTwoInputProcessor(
@Override
public CompletableFuture<?> getAvailableFuture() {
- if (inputSelectionHandler.areAllInputsSelected()) {
+ if (inputSelectionHandler.areAllInputsSelected() ||
+ (inputSelectionHandler.isInputUnavailable(0) &&
!inputSelectionHandler.isInputUnavailable(1))) {
Review comment:
I don't get this condition. Why first has to be unavailable, while
second available?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]