This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c27fd8dc72c [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions c27fd8dc72c is described below commit c27fd8dc72ceac7631b5f7482db1e9a14b339f68 Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Mon May 16 12:30:38 2022 +0800 [FLINK-27623][table] Add 'table.exec.async-lookup.output-mode' to ExecutionConfigOptions This closes #19759 --- .../generated/execution_config_configuration.html | 6 +++ .../table/api/config/ExecutionConfigOptions.java | 25 ++++++++++++ .../plan/nodes/exec/batch/BatchExecLookupJoin.java | 1 + .../nodes/exec/common/CommonExecLookupJoin.java | 22 ++++++++-- .../nodes/exec/stream/StreamExecLookupJoin.java | 4 ++ .../physical/stream/StreamPhysicalLookupJoin.scala | 3 +- .../factories/TestValuesRuntimeFunctions.java | 11 ++++- .../testJoinTemporalTable.out | 1 + ...testJoinTemporalTableWithProjectionPushDown.out | 1 + .../runtime/stream/sql/AsyncLookupJoinITCase.scala | 20 ++++++--- .../operators/join/AsyncLookupJoinHarnessTest.java | 47 ++++++++++++++++++---- 11 files changed, 123 insertions(+), 18 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html index 364d9bf9d81..d2974df5ac6 100644 --- a/docs/layouts/shortcodes/generated/execution_config_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html @@ -14,6 +14,12 @@ <td>Integer</td> <td>The max number of async i/o operation that the async lookup join can trigger.</td> </tr> + <tr> + <td><h5>table.exec.async-lookup.output-mode</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">ORDERED</td> + <td><p>Enum</p></td> + <td>Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.<br /><br />Possible values:<ul><li>"ORDERED"</li><li>"ALLOW_UNORDERED"</li></ul></td> + </tr> <tr> <td><h5>table.exec.async-lookup.timeout</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">3 min</td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 9aa8642dee4..474fc4f9c24 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -323,6 +323,16 @@ public class ExecutionConfigOptions { .withDescription( "The async timeout for the asynchronous operation to complete."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE = + key("table.exec.async-lookup.output-mode") + .enumType(AsyncOutputMode.class) + .defaultValue(AsyncOutputMode.ORDERED) + .withDescription( + "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. " + + "If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not " + + "affect the correctness of the result, otherwise ORDERED will be still used."); + // ------------------------------------------------------------------------ // MiniBatch Options // ------------------------------------------------------------------------ @@ -573,6 +583,21 @@ public class ExecutionConfigOptions { FORCE } + /** Output mode for asynchronous operations, equivalent to {@see AsyncDataStream.OutputMode}. */ + @PublicEvolving + public enum AsyncOutputMode { + + /** Ordered output mode, equivalent to {@see AsyncDataStream.OutputMode.ORDERED}. */ + ORDERED, + + /** + * Allow unordered output mode, will attempt to use {@see + * AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the + * result, otherwise ORDERED will be still used. + */ + ALLOW_UNORDERED + } + /** Determine if CAST operates using the legacy behaviour or the new one. */ @Deprecated public enum LegacyCastBehaviour implements DescribedEnum { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java index c9f9180941f..7d02498013a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java @@ -59,6 +59,7 @@ public class BatchExecLookupJoin extends CommonExecLookupJoin implements BatchEx lookupKeys, projectionOnTemporalTable, filterOnTemporalTable, + true, Collections.singletonList(inputProperty), outputType, description); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java index fb2b7619be2..a888cb292f7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java @@ -149,6 +149,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> "projectionOnTemporalTable"; public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable"; + public static final String FIELD_NAME_INPUT_INSERT_ONLY = "inputInsertOnly"; + @JsonProperty(FIELD_NAME_JOIN_TYPE) private final FlinkJoinType joinType; @@ -172,6 +174,9 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> @JsonProperty(FIELD_NAME_JOIN_CONDITION) private final @Nullable RexNode joinCondition; + @JsonProperty(FIELD_NAME_INPUT_INSERT_ONLY) + private final boolean inputInsertOnly; + protected CommonExecLookupJoin( int id, ExecNodeContext context, @@ -183,6 +188,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> Map<Integer, LookupJoinUtil.LookupKey> lookupKeys, @Nullable List<RexNode> projectionOnTemporalTable, @Nullable RexNode filterOnTemporalTable, + boolean inputInsertOnly, List<InputProperty> inputProperties, RowType outputType, String description) { @@ -194,6 +200,7 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> this.temporalTableSourceSpec = checkNotNull(temporalTableSourceSpec); this.projectionOnTemporalTable = projectionOnTemporalTable; this.filterOnTemporalTable = filterOnTemporalTable; + this.inputInsertOnly = inputInsertOnly; } public TemporalTableSourceSpec getTemporalTableSourceSpec() { @@ -316,6 +323,8 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY); long asyncTimeout = config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT).toMillis(); + ExecutionConfigOptions.AsyncOutputMode asyncOutputMode = + config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE); DataTypeFactory dataTypeFactory = ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory(); @@ -388,10 +397,17 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> asyncBufferCapacity); } - // force ORDERED output mode currently, optimize it to UNORDERED - // when the downstream do not need orderness return new AsyncWaitOperatorFactory<>( - asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED); + asyncFunc, asyncTimeout, asyncBufferCapacity, convert(asyncOutputMode)); + } + + private AsyncDataStream.OutputMode convert( + ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) { + if (inputInsertOnly + && asyncOutputMode == ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED) { + return AsyncDataStream.OutputMode.UNORDERED; + } + return AsyncDataStream.OutputMode.ORDERED; } private StreamOperatorFactory<RowData> createSyncLookupJoin( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java index 89549588827..f54b06e7c60 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java @@ -57,6 +57,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream Map<Integer, LookupJoinUtil.LookupKey> lookupKeys, @Nullable List<RexNode> projectionOnTemporalTable, @Nullable RexNode filterOnTemporalTable, + boolean inputInsertOnly, InputProperty inputProperty, RowType outputType, String description) { @@ -70,6 +71,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream lookupKeys, projectionOnTemporalTable, filterOnTemporalTable, + inputInsertOnly, Collections.singletonList(inputProperty), outputType, description); @@ -89,6 +91,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream List<RexNode> projectionOnTemporalTable, @JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE) @Nullable RexNode filterOnTemporalTable, + @JsonProperty(FIELD_NAME_INPUT_INSERT_ONLY) @Nullable Boolean inputInsertOnly, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { @@ -102,6 +105,7 @@ public class StreamExecLookupJoin extends CommonExecLookupJoin implements Stream lookupKeys, projectionOnTemporalTable, filterOnTemporalTable, + inputInsertOnly, inputProperties, outputType, description); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala index 3d797f74d6a..d2d03841b4f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin -import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, JoinTypeUtil} +import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRexUtil, JoinTypeUtil} import org.apache.flink.table.planner.utils.JavaScalaConversionUtil import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig @@ -83,6 +83,7 @@ class StreamPhysicalLookupJoin( allLookupKeys.map(item => (Int.box(item._1), item._2)).asJava, projectionOnTemporalTable, filterOnTemporalTable, + ChangelogPlanUtils.inputInsertOnly(this), InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index 66621dc06e0..284d8602984 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -594,18 +595,21 @@ final class TestValuesRuntimeFunctions { private static final long serialVersionUID = 1L; private final Map<Row, List<Row>> mapping; + private final Random random; private transient boolean isOpenCalled = false; private transient ExecutorService executor; protected AsyncTestValueLookupFunction(Map<Row, List<Row>> mapping) { this.mapping = mapping; + this.random = new Random(); } @Override public void open(FunctionContext context) throws Exception { RESOURCE_COUNTER.incrementAndGet(); isOpenCalled = true; - executor = Executors.newSingleThreadExecutor(); + // generate unordered result for async lookup + executor = Executors.newFixedThreadPool(2); } public void eval(CompletableFuture<Collection<Row>> resultFuture, Object... inputs) { @@ -619,6 +623,11 @@ final class TestValuesRuntimeFunctions { } CompletableFuture.supplyAsync( () -> { + try { + Thread.sleep(random.nextInt(5)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } List<Row> list = mapping.get(key); if (list == null) { return Collections.<Row>emptyList(); diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out index cba8ae4894a..b232637a236 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTable.out @@ -258,6 +258,7 @@ }, "projectionOnTemporalTable" : null, "filterOnTemporalTable" : null, + "inputInsertOnly" : true, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out index c3012c79471..fc980f3b3c0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest_jsonplan/testJoinTemporalTableWithProjectionPushDown.out @@ -262,6 +262,7 @@ "type" : "INT" } ], "filterOnTemporalTable" : null, + "inputInsertOnly" : true, "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala index 5bb390596ea..94858311ed6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala @@ -20,6 +20,8 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api.{TableSchema, Types} import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.api.config.ExecutionConfigOptions.AsyncOutputMode import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} @@ -41,7 +43,8 @@ import scala.collection.JavaConversions._ class AsyncLookupJoinITCase( legacyTableSource: Boolean, backend: StateBackendMode, - objectReuse: Boolean) + objectReuse: Boolean, + asyncOutputMode: AsyncOutputMode) extends StreamingWithStateTestBase(backend) { val data = List( @@ -62,6 +65,8 @@ class AsyncLookupJoinITCase( env.getConfig.disableObjectReuse() } + tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE, asyncOutputMode) + createScanTable("src", data) createLookupTable("user_table", userData) } @@ -303,13 +308,16 @@ class AsyncLookupJoinITCase( } object AsyncLookupJoinITCase { - @Parameterized.Parameters(name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}") + @Parameterized.Parameters( + name = "LegacyTableSource={0}, StateBackend={1}, ObjectReuse={2}, AsyncOutputMode={3}") def parameters(): JCollection[Array[Object]] = { Seq[Array[AnyRef]]( - Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE), - Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE), - Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE), - Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE) + Array(JBoolean.TRUE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED), + Array(JBoolean.TRUE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED), + Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.FALSE, AsyncOutputMode.ORDERED), + Array(JBoolean.FALSE, HEAP_BACKEND, JBoolean.TRUE, AsyncOutputMode.ORDERED), + Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.FALSE, AsyncOutputMode.ALLOW_UNORDERED), + Array(JBoolean.FALSE, ROCKSDB_BACKEND, JBoolean.TRUE, AsyncOutputMode.ALLOW_UNORDERED) ) } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java index d3a2952be75..6c70e6b54cf 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/AsyncLookupJoinHarnessTest.java @@ -51,6 +51,8 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.Collector; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; @@ -59,6 +61,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -71,11 +74,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; /** Harness tests for {@link LookupJoinRunner} and {@link LookupJoinWithCalcRunner}. */ +@RunWith(Parameterized.class) public class AsyncLookupJoinHarnessTest { private static final int ASYNC_BUFFER_CAPACITY = 100; private static final int ASYNC_TIMEOUT_MS = 3000; + @Parameterized.Parameter public boolean orderedResult; + + @Parameterized.Parameters(name = "ordered result = {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {true}, new Object[] {false}}; + } + private final TypeSerializer<RowData> inSerializer = new RowDataSerializer( DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()); @@ -130,7 +141,7 @@ public class AsyncLookupJoinHarnessTest { expectedOutput.add(insertRecord(3, "c", 3, "Jackson")); expectedOutput.add(insertRecord(4, "d", 4, "Fabian")); - assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + checkResult(expectedOutput, testHarness.getOutput()); } @Test @@ -159,7 +170,7 @@ public class AsyncLookupJoinHarnessTest { expectedOutput.add(insertRecord(3, "c", 3, "Jackson")); expectedOutput.add(insertRecord(4, "d", 4, "Fabian")); - assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + checkResult(expectedOutput, testHarness.getOutput()); } @Test @@ -191,7 +202,7 @@ public class AsyncLookupJoinHarnessTest { expectedOutput.add(insertRecord(4, "d", 4, "Fabian")); expectedOutput.add(insertRecord(5, "e", null, null)); - assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + checkResult(expectedOutput, testHarness.getOutput()); } @Test @@ -222,11 +233,19 @@ public class AsyncLookupJoinHarnessTest { expectedOutput.add(insertRecord(4, "d", 4, "Fabian")); expectedOutput.add(insertRecord(5, "e", null, null)); - assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + checkResult(expectedOutput, testHarness.getOutput()); } // --------------------------------------------------------------------------------- + private void checkResult(Collection<Object> expectedOutput, Collection<Object> actualOutput) { + if (orderedResult) { + assertor.assertOutputEquals("output wrong.", expectedOutput, actualOutput); + } else { + assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness( JoinType joinType, FilterOnTable filterOnTable) throws Exception { @@ -258,7 +277,9 @@ public class AsyncLookupJoinHarnessTest { joinRunner, ASYNC_TIMEOUT_MS, ASYNC_BUFFER_CAPACITY, - AsyncDataStream.OutputMode.ORDERED), + orderedResult + ? AsyncDataStream.OutputMode.ORDERED + : AsyncDataStream.OutputMode.UNORDERED), inSerializer); } @@ -319,6 +340,8 @@ public class AsyncLookupJoinHarnessTest { private static final Map<Integer, List<RowData>> data = new HashMap<>(); + private final Random random = new Random(); + static { data.put(1, Collections.singletonList(GenericRowData.of(1, fromString("Julian")))); data.put( @@ -334,7 +357,8 @@ public class AsyncLookupJoinHarnessTest { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - this.executor = Executors.newSingleThreadExecutor(); + // generate unordered result for async lookup + this.executor = Executors.newFixedThreadPool(2); } @Override @@ -342,7 +366,16 @@ public class AsyncLookupJoinHarnessTest { throws Exception { int id = input.getInt(0); CompletableFuture.supplyAsync( - (Supplier<Collection<RowData>>) () -> data.get(id), executor) + (Supplier<Collection<RowData>>) + () -> { + try { + Thread.sleep(random.nextInt(5)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return data.get(id); + }, + executor) .thenAcceptAsync(resultFuture::complete, executor); }