This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0a55677dff7 [FLINK-35904][test] Make async test harness extend exsit
test harness (#25856)
0a55677dff7 is described below
commit 0a55677dff7fe0c95abc52938207f73bb4fa77e4
Author: Yanfei Lei <[email protected]>
AuthorDate: Thu Dec 26 10:27:45 2024 +0800
[FLINK-35904][test] Make async test harness extend exsit test harness
(#25856)
---
.../util/MultiInputStreamOperatorTestHarness.java | 2 +-
...ncKeyedMultiInputStreamOperatorTestHarness.java | 25 +++++-----
...syncKeyedOneInputStreamOperatorTestHarness.java | 53 ++++++++++++++--------
...syncKeyedTwoInputStreamOperatorTestHarness.java | 42 +++++++++++++----
.../asyncprocessing/AsyncProcessingTestUtil.java | 2 +-
5 files changed, 82 insertions(+), 42 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
index 454e0e60d5a..058fc57ee8b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java
@@ -69,7 +69,7 @@ public class MultiInputStreamOperatorTestHarness<OUT>
getCastedOperator().getInputs().get(idx).processRecordAttributes(recordAttributes);
}
- private MultipleInputStreamOperator<OUT> getCastedOperator() {
+ protected MultipleInputStreamOperator<OUT> getCastedOperator() {
return (MultipleInputStreamOperator<OUT>) operator;
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
index 82a6d424c4f..566424d7d21 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.java
@@ -30,7 +30,7 @@ import
org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.List;
@@ -48,10 +48,10 @@ import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTes
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
- extends AbstractStreamOperatorTestHarness<OUT> {
+ extends MultiInputStreamOperatorTestHarness<OUT> {
/** The executor service for async state processing. */
- private ExecutorService executor;
+ private final ExecutorService executor;
public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K,
OUT> create(
StreamOperatorFactory<OUT> operatorFactory,
@@ -108,6 +108,8 @@ public class
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
config.serializeAllConfigs();
}
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
public void processElement(int idx, StreamRecord<?> element) throws
Exception {
Input input = getCastedOperator().getInputs().get(idx);
ThrowingConsumer<StreamRecord<?>, Exception> inputProcessor =
@@ -115,16 +117,22 @@ public class
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
execute(executor, (ignore) -> inputProcessor.accept(element)).get();
}
+ @Override
+ @SuppressWarnings("rawtypes")
public void processWatermark(int idx, Watermark mark) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) -> input.processWatermark(mark)).get();
}
+ @Override
+ @SuppressWarnings("rawtypes")
public void processWatermarkStatus(int idx, WatermarkStatus
watermarkStatus) throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
execute(executor, (ignore) ->
input.processWatermarkStatus(watermarkStatus)).get();
}
+ @Override
+ @SuppressWarnings("rawtypes")
public void processRecordAttributes(int idx, RecordAttributes
recordAttributes)
throws Exception {
Input input = getCastedOperator().getInputs().get(idx);
@@ -137,16 +145,7 @@ public class
AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>
@Override
public void close() throws Exception {
- execute(
- executor,
- (ignore) -> {
- super.close();
- })
- .get();
+ execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
-
- private MultipleInputStreamOperator<OUT> getCastedOperator() {
- return (MultipleInputStreamOperator<OUT>) operator;
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
index fbca7135f38..6128a5b915a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -34,7 +35,7 @@ import
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import java.util.ArrayList;
@@ -54,13 +55,15 @@ import static
org.apache.flink.util.Preconditions.checkState;
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
- extends AbstractStreamOperatorTestHarness<OUT> {
+ extends OneInputStreamOperatorTestHarness<IN, OUT> {
/** Empty if the {@link #operator} is not {@link
MultipleInputStreamOperator}. */
- private final List<Input> inputs = new ArrayList<>();
+ private final List<Input<IN>> inputs = new ArrayList<>();
+
+ private long currentWatermark;
/** The executor service for async state processing. */
- private ExecutorService executor;
+ private final ExecutorService executor;
public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K,
IN, OUT> create(
OneInputStreamOperator<IN, OUT> operator,
@@ -120,6 +123,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K,
IN, OUT>
}
@Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
public void setup(TypeSerializer<OUT> outputSerializer) {
super.setup(outputSerializer);
if (operator instanceof MultipleInputStreamOperator) {
@@ -128,10 +132,7 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
}
- public OneInputStreamOperator<IN, OUT> getOneInputOperator() {
- return (OneInputStreamOperator<IN, OUT>) this.operator;
- }
-
+ @Override
public void processElement(StreamRecord<IN> element) throws Exception {
processElementInternal(element).get();
}
@@ -140,6 +141,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K,
IN, OUT>
* Submit an element processing in an executor thread. This method is
mainly used for internal
* testing, please use {@link #processElement} for common operator testing.
*/
+ @SuppressWarnings({"rawtypes", "unchecked"})
public CompletableFuture<Void> processElementInternal(StreamRecord<IN>
element)
throws Exception {
if (inputs.isEmpty()) {
@@ -160,22 +162,24 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
}
+ @Override
public void processWatermark(long watermark) throws Exception {
processWatermarkInternal(watermark).get();
}
/** For internal testing. */
- public CompletableFuture<Void> processWatermarkInternal(long watermark)
throws Exception {
+ public CompletableFuture<Void> processWatermarkInternal(long watermark) {
return processWatermarkInternal(new Watermark(watermark));
}
+ @Override
public void processWatermarkStatus(WatermarkStatus status) throws
Exception {
processWatermarkStatusInternal(status).get();
}
/** For internal testing. */
- public CompletableFuture<Void>
processWatermarkStatusInternal(WatermarkStatus status)
- throws Exception {
+ @SuppressWarnings("rawtypes")
+ public CompletableFuture<Void>
processWatermarkStatusInternal(WatermarkStatus status) {
if (inputs.isEmpty()) {
return execute(
executor, (ignore) ->
getOneInputOperator().processWatermarkStatus(status));
@@ -186,12 +190,22 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
}
+ @Override
public void processWatermark(Watermark mark) throws Exception {
processWatermarkInternal(mark).get();
}
+ @Override
+ public void endInput() throws Exception {
+ if (operator instanceof BoundedOneInput) {
+ execute(executor, (ignore) -> ((BoundedOneInput)
operator).endInput()).get();
+ }
+ }
+
/** For internal testing. */
- public CompletableFuture<Void> processWatermarkInternal(Watermark mark)
throws Exception {
+ @SuppressWarnings("rawtypes")
+ public CompletableFuture<Void> processWatermarkInternal(Watermark mark) {
+ currentWatermark = mark.getTimestamp();
if (inputs.isEmpty()) {
return execute(executor, (ignore) ->
getOneInputOperator().processWatermark(mark));
} else {
@@ -206,6 +220,7 @@ public class AsyncKeyedOneInputStreamOperatorTestHarness<K,
IN, OUT>
}
/** For internal testing. */
+ @SuppressWarnings("rawtypes")
public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker
marker) {
if (inputs.isEmpty()) {
return execute(
@@ -217,11 +232,18 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
}
+ @Override
public void processRecordAttributes(RecordAttributes recordAttributes)
throws Exception {
processRecordAttributesInternal(recordAttributes).get();
}
+ @Override
+ public long getCurrentWatermark() {
+ return currentWatermark;
+ }
+
/** For internal testing. */
+ @SuppressWarnings("rawtypes")
public CompletableFuture<Void> processRecordAttributesInternal(
RecordAttributes recordAttributes) {
if (inputs.isEmpty()) {
@@ -246,12 +268,7 @@ public class
AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@Override
public void close() throws Exception {
- execute(
- executor,
- (ignore) -> {
- super.close();
- })
- .get();
+ execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
index d3f34a1b19c..4128f073afd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -30,7 +31,7 @@ import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStatePr
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -48,7 +49,7 @@ import static
org.apache.flink.streaming.util.asyncprocessing.AsyncProcessingTes
* async processing, please use methods of test harness instead of operator.
*/
public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
- extends AbstractStreamOperatorTestHarness<OUT> {
+ extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
@@ -56,7 +57,7 @@ public class AsyncKeyedTwoInputStreamOperatorTestHarness<K,
IN1, IN2, OUT>
private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;
/** The executor service for async state processing. */
- private ExecutorService executor;
+ private final ExecutorService executor;
public static <K, IN1, IN2, OUT>
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
create(
@@ -132,62 +133,85 @@ public class
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
return processor2;
}
+ @Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
execute(executor, (ignore) ->
getRecordProcessor1().accept(element)).get();
}
+ @Override
public void processElement1(IN1 value, long timestamp) throws Exception {
processElement1(new StreamRecord<>(value, timestamp));
}
+ @Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
execute(executor, (ignore) ->
getRecordProcessor2().accept(element)).get();
}
+ @Override
public void processElement2(IN2 value, long timestamp) throws Exception {
processElement2(new StreamRecord<>(value, timestamp));
}
+ @Override
public void processWatermark1(Watermark mark) throws Exception {
execute(executor, (ignore) ->
twoInputOperator.processWatermark1(mark)).get();
}
+ @Override
public void processWatermark2(Watermark mark) throws Exception {
execute(executor, (ignore) ->
twoInputOperator.processWatermark2(mark)).get();
}
+ @Override
+ public void processBothWatermarks(Watermark mark) throws Exception {
+ execute(executor, (ignore) ->
twoInputOperator.processWatermark1(mark)).get();
+ execute(executor, (ignore) ->
twoInputOperator.processWatermark2(mark)).get();
+ }
+
+ @Override
public void processWatermarkStatus1(WatermarkStatus watermarkStatus)
throws Exception {
execute(executor, (ignore) ->
twoInputOperator.processWatermarkStatus1(watermarkStatus))
.get();
}
+ @Override
public void processWatermarkStatus2(WatermarkStatus watermarkStatus)
throws Exception {
execute(executor, (ignore) ->
twoInputOperator.processWatermarkStatus2(watermarkStatus))
.get();
}
+ @Override
public void processRecordAttributes1(RecordAttributes recordAttributes)
throws Exception {
execute(executor, (ignore) ->
twoInputOperator.processRecordAttributes1(recordAttributes))
.get();
}
+ @Override
public void processRecordAttributes2(RecordAttributes recordAttributes)
throws Exception {
execute(executor, (ignore) ->
twoInputOperator.processRecordAttributes2(recordAttributes))
.get();
}
+ public void endInput1() throws Exception {
+ if (operator instanceof BoundedMultiInput) {
+ execute(executor, (ignore) -> ((BoundedMultiInput)
operator).endInput(1)).get();
+ }
+ }
+
+ public void endInput2() throws Exception {
+ if (operator instanceof BoundedMultiInput) {
+ execute(executor, (ignore) -> ((BoundedMultiInput)
operator).endInput(2)).get();
+ }
+ }
+
public void drainStateRequests() throws Exception {
execute(executor, (ignore) -> drain(operator)).get();
}
@Override
public void close() throws Exception {
- execute(
- executor,
- (ignore) -> {
- super.close();
- })
- .get();
+ execute(executor, (ignore) -> super.close()).get();
executor.shutdown();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
index a88146551e3..0c5252e5e40 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncProcessingTestUtil.java
@@ -42,7 +42,7 @@ public class AsyncProcessingTestUtil {
public static CompletableFuture<Void> execute(
ExecutorService executor, ThrowingConsumer<Void, Exception>
processor) {
- CompletableFuture<Void> future = new CompletableFuture();
+ CompletableFuture<Void> future = new CompletableFuture<>();
executor.execute(
() -> {
try {