This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 94801dd3d [flink] Refactor MergeIntoProcedure to make the api easier
to use (#2145)
94801dd3d is described below
commit 94801dd3df937d94a9aa032fffc1e070a6971427
Author: yuzelin <[email protected]>
AuthorDate: Wed Oct 18 12:02:42 2023 +0800
[flink] Refactor MergeIntoProcedure to make the api easier to use (#2145)
---
.../paimon/flink/utils/TableEnvironmentUtils.java | 10 +-
.../apache/paimon/flink/action/DeleteAction.java | 2 +-
.../paimon/flink/action/MergeIntoAction.java | 8 +-
.../paimon/flink/action/TableActionBase.java | 6 +-
.../paimon/flink/procedure/MergeIntoProcedure.java | 245 ++++++++++++---------
.../paimon/flink/procedure/ProcedureBase.java | 12 +-
.../paimon/flink/utils/TableEnvironmentUtils.java | 10 +-
.../paimon/flink/action/MergeIntoActionITCase.java | 181 ++++++++-------
8 files changed, 274 insertions(+), 200 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
index b4fbd56e5..d1f4c09b4 100644
---
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
+++
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
@@ -26,7 +26,6 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
-import java.util.concurrent.ExecutionException;
/** Utility methods for {@link TableEnvironment} and its subclasses. */
public class TableEnvironmentUtils {
@@ -35,7 +34,7 @@ public class TableEnvironmentUtils {
* Invoke {@code
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
* from a {@link StreamTableEnvironment} instance through reflecting.
*/
- public static void executeInternal(
+ public static TableResult executeInternal(
StreamTableEnvironment tEnv,
List<Transformation<?>> transformations,
List<String> sinkIdentifierNames) {
@@ -45,10 +44,7 @@ public class TableEnvironmentUtils {
clazz.getDeclaredMethod("executeInternal", List.class,
List.class);
executeInternal.setAccessible(true);
- TableResult tableResult =
- (TableResult)
- executeInternal.invoke(tEnv, transformations,
sinkIdentifierNames);
- tableResult.await();
+ return (TableResult) executeInternal.invoke(tEnv, transformations,
sinkIdentifierNames);
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"Failed to get 'TableEnvironmentImpl#executeInternal(List,
List)' method "
@@ -59,8 +55,6 @@ public class TableEnvironmentUtils {
"Failed to invoke
'TableEnvironmentImpl#executeInternal(List, List)' method "
+ "from given StreamTableEnvironment instance by
Java reflection. This is unexpected.",
e);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException("Failed to wait for insert job to
finish.", e);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 4de8e80ac..19f77fe91 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -83,6 +83,6 @@ public class DeleteAction extends TableActionBase {
return rowData;
});
- batchSink(dataStream);
+ batchSink(dataStream).await();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 60b8a710b..3fed1f56f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -219,6 +219,11 @@ public class MergeIntoAction extends TableActionBase {
@Override
public void run() throws Exception {
+ DataStream<RowData> dataStream = buildDataStream();
+ batchSink(dataStream).await();
+ }
+
+ public DataStream<RowData> buildDataStream() {
// handle aliases
handleTargetAlias();
@@ -237,9 +242,8 @@ public class MergeIntoAction extends TableActionBase {
.map(Optional::get)
.collect(Collectors.toList());
- // sink to target table
DataStream<RowData> firstDs = dataStreams.get(0);
-
batchSink(firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new)));
+ return
firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new));
}
private void handleTargetAlias() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 5095cc352..06d234469 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.data.RowData;
import java.util.Collections;
@@ -57,7 +58,7 @@ public abstract class TableActionBase extends ActionBase {
}
/** Sink {@link DataStream} dataStream to table with Flink Table API in
batch environment. */
- protected void batchSink(DataStream<RowData> dataStream) {
+ public TableResult batchSink(DataStream<RowData> dataStream) {
List<Transformation<?>> transformations =
Collections.singletonList(
new FlinkSinkBuilder((FileStoreTable) table)
@@ -67,7 +68,8 @@ public abstract class TableActionBase extends ActionBase {
List<String> sinkIdentifierNames =
Collections.singletonList(identifier.getFullName());
- TableEnvironmentUtils.executeInternal(batchTEnv, transformations,
sinkIdentifierNames);
+ return TableEnvironmentUtils.executeInternal(
+ batchTEnv, transformations, sinkIdentifierNames);
}
/**
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index e2c14e0c3..e6658346b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -23,18 +23,14 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MergeIntoAction;
import org.apache.paimon.flink.action.MergeIntoActionFactory;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -42,29 +38,62 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
* Merge Into procedure. Usage:
*
* <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- when matched then upsert
* CALL sys.merge_into(
* 'targetTableId',
* 'targetAlias',
* 'sourceSqls', -- separate with ';'
* 'sourceTable',
* 'mergeCondition',
- * 'mergeActions',
- * // arguments for merge actions
- * ...
- * )
+ * 'matchedUpsertCondition',
+ * 'matchedUpsertSetting'
+ * )
*
- * -- merge actions and corresponding arguments
- * matched-upsert: condition, set
- * not-matched-by-source-upsert: condition, set
- * matched-delete: condition
- * not-matched-by-source-delete: condition
- * not-matched-insert: condition, insertValues
+ * -- when matched then upsert + when not matched then insert
+ * CALL sys.merge_into(
+ * 'targetTableId'
+ * 'targetAlias',
+ * 'sourceSqls',
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedUpsertCondition',
+ * 'matchedUpsertSetting',
+ * 'notMatchedInsertCondition',
+ * 'notMatchedInsertValues'
+ * )
*
- * -- NOTE: the arguments should be in the order of merge actions
- * -- and use '' as placeholder for optional arguments
+ * -- above + when matched then delete
+ * -- IMPORTANT: Use 'TRUE' if you want to delete data without filter
condition.
+ * -- If matchedDeleteCondition='', it will ignore matched-delete action!
+ * CALL sys.merge_into(
+ * 'targetTableId',
+ * 'targetAlias',
+ * 'sourceSqls',
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedUpsertCondition',
+ * 'matchedUpsertSetting',
+ * 'notMatchedInsertCondition',
+ * 'notMatchedInsertValues',
+ * 'matchedDeleteCondition'
+ * )
+ *
+ * -- when matched then delete (short form)
+ * CALL sys.merge_into(
+ * 'targetTableId'
+ * 'targetAlias',
+ * 'sourceSqls',
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'matchedDeleteCondition'
+ * )
* </code></pre>
*
- * <p>This procedure will be forced to use batch environments
+ * <p>This procedure will be forced to use batch environments. Compared to
{@link MergeIntoAction},
+ * this procedure doesn't provide arguments to control not-matched-by-source
behavior because they
+ * are not commonly used and will make the methods too complex to use.
*/
public class MergeIntoProcedure extends ProcedureBase {
@@ -77,9 +106,81 @@ public class MergeIntoProcedure extends ProcedureBase {
String sourceSqls,
String sourceTable,
String mergeCondition,
- String mergeActions,
- String... mergeActionArguments)
- throws Exception {
+ String matchedUpsertCondition,
+ String matchedUpsertSetting) {
+ return call(
+ procedureContext,
+ targetTableId,
+ targetAlias,
+ sourceSqls,
+ sourceTable,
+ mergeCondition,
+ matchedUpsertCondition,
+ matchedUpsertSetting,
+ "",
+ "",
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedUpsertCondition,
+ String matchedUpsertSetting,
+ String notMatchedInsertCondition,
+ String notMatchedInsertValues) {
+ return call(
+ procedureContext,
+ targetTableId,
+ targetAlias,
+ sourceSqls,
+ sourceTable,
+ mergeCondition,
+ matchedUpsertCondition,
+ matchedUpsertSetting,
+ notMatchedInsertCondition,
+ notMatchedInsertValues,
+ "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedDeleteCondition) {
+ return call(
+ procedureContext,
+ targetTableId,
+ targetAlias,
+ sourceSqls,
+ sourceTable,
+ mergeCondition,
+ "",
+ "",
+ "",
+ "",
+ matchedDeleteCondition);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedUpsertCondition,
+ String matchedUpsertSetting,
+ String notMatchedInsertCondition,
+ String notMatchedInsertValues,
+ String matchedDeleteCondition) {
String warehouse = ((AbstractCatalog) catalog).warehouse();
Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
Identifier identifier = Identifier.fromString(targetTableId);
@@ -101,88 +202,32 @@ public class MergeIntoProcedure extends ProcedureBase {
checkArgument(!mergeCondition.isEmpty(), "Must specify merge
condition.");
action.withMergeCondition(mergeCondition);
- checkArgument(!mergeActions.isEmpty(), "Must specify at least one
merge action.");
- List<String> actions =
- Arrays.stream(mergeActions.split(","))
- .map(String::trim)
- .collect(Collectors.toList());
- validateActions(actions, mergeActionArguments.length);
-
- int index = 0;
- String condition, setting;
- for (String mergeAction : actions) {
- switch (mergeAction) {
- case MATCHED_UPSERT:
- case NOT_MATCHED_BY_SOURCE_UPSERT:
- case NOT_MATCHED_INSERT:
- condition = nullable(mergeActionArguments[index++]);
- setting = nullable(mergeActionArguments[index++]);
- checkNotNull(setting, "%s must set the second argument",
mergeAction);
- setMergeAction(action, mergeAction, condition, setting);
- break;
- case MATCHED_DELETE:
- case NOT_MATCHED_BY_SOURCE_DELETE:
- condition = nullable(mergeActionArguments[index++]);
- setMergeAction(action, mergeAction, condition);
- break;
- default:
- throw new UnsupportedOperationException("Unknown merge
action: " + action);
- }
+ if (!matchedUpsertCondition.isEmpty() ||
!matchedUpsertSetting.isEmpty()) {
+ String condition = nullable(matchedUpsertCondition);
+ String setting = nullable(matchedUpsertSetting);
+ checkNotNull(setting, "matched-upsert must set the
'matchedUpsertSetting' argument");
+ action.withMatchedUpsert(condition, setting);
}
- MergeIntoActionFactory.validate(action);
-
- // TODO set dml-sync argument to action
- action.run();
-
- return new String[] {"Success"};
- }
+ if (!notMatchedInsertCondition.isEmpty() ||
!notMatchedInsertValues.isEmpty()) {
+ String condition = nullable(notMatchedInsertCondition);
+ String values = nullable(notMatchedInsertValues);
+ checkNotNull(
+ values, "not-matched-insert must set the
'notMatchedInsertValues' argument");
+ action.withNotMatchedInsert(condition, values);
+ }
- private void validateActions(List<String> mergeActions, int
argumentLength) {
- int expectedArguments = 0;
- for (String action : mergeActions) {
- switch (action) {
- case MATCHED_UPSERT:
- case NOT_MATCHED_BY_SOURCE_UPSERT:
- case NOT_MATCHED_INSERT:
- expectedArguments += 2;
- break;
- case MATCHED_DELETE:
- case NOT_MATCHED_BY_SOURCE_DELETE:
- expectedArguments += 1;
- break;
- default:
- throw new UnsupportedOperationException("Unknown merge
action: " + action);
- }
+ if (!matchedDeleteCondition.isEmpty()) {
+ action.withMatchedDelete(matchedDeleteCondition);
}
- checkArgument(
- expectedArguments == argumentLength,
- "Expected %s action arguments but given '%s'",
- expectedArguments,
- argumentLength);
- }
+ MergeIntoActionFactory.validate(action);
- private void setMergeAction(MergeIntoAction action, String mergeAction,
String... arguments) {
- switch (mergeAction) {
- case MATCHED_UPSERT:
- action.withMatchedUpsert(arguments[0], arguments[1]);
- return;
- case NOT_MATCHED_BY_SOURCE_UPSERT:
- action.withNotMatchedBySourceUpsert(arguments[0],
arguments[1]);
- return;
- case NOT_MATCHED_INSERT:
- action.withNotMatchedInsert(arguments[0], arguments[1]);
- return;
- case MATCHED_DELETE:
- action.withMatchedDelete(arguments[0]);
- return;
- case NOT_MATCHED_BY_SOURCE_DELETE:
- action.withNotMatchedBySourceDelete(arguments[0]);
- return;
- default:
- throw new UnsupportedOperationException("Unknown merge action:
" + mergeAction);
- }
+ DataStream<RowData> dataStream = action.buildDataStream();
+ TableResult tableResult = action.batchSink(dataStream);
+ JobClient jobClient = tableResult.getJobClient().get();
+
+ return execute(procedureContext, jobClient);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 82f869f28..91b8a1aa1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -73,8 +73,18 @@ public abstract class ProcedureBase implements Procedure,
Factory {
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
JobClient jobClient = env.executeAsync(name);
+ return execute(jobClient, conf.get(TABLE_DML_SYNC));
+ }
+
+ protected String[] execute(ProcedureContext procedureContext, JobClient
jobClient) {
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ return execute(jobClient, conf.get(TABLE_DML_SYNC));
+ }
+
+ private String[] execute(JobClient jobClient, boolean dmlSync) {
String jobId = jobClient.getJobID().toString();
- if (conf.get(TABLE_DML_SYNC)) {
+ if (dmlSync) {
try {
jobClient.getJobExecutionResult().get();
} catch (Exception e) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
index 9b7009387..6a5820ce8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
@@ -26,7 +26,6 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
-import java.util.concurrent.ExecutionException;
/** Utility methods for {@link TableEnvironment} and its subclasses. */
public class TableEnvironmentUtils {
@@ -35,7 +34,7 @@ public class TableEnvironmentUtils {
* Invoke {@code
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
* from a {@link StreamTableEnvironment} instance through reflecting.
*/
- public static void executeInternal(
+ public static TableResult executeInternal(
StreamTableEnvironment tEnv,
List<Transformation<?>> transformations,
List<String> sinkIdentifierNames) {
@@ -45,10 +44,7 @@ public class TableEnvironmentUtils {
clazz.getDeclaredMethod("executeInternal", List.class,
List.class);
executeInternal.setAccessible(true);
- TableResult tableResult =
- (TableResult)
- executeInternal.invoke(tEnv, transformations,
sinkIdentifierNames);
- tableResult.await();
+ return (TableResult) executeInternal.invoke(tEnv, transformations,
sinkIdentifierNames);
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"Failed to get 'TableEnvironmentImpl#executeInternal(List,
List)' method "
@@ -59,8 +55,6 @@ public class TableEnvironmentUtils {
"Failed to invoke
'TableEnvironmentImpl#executeInternal(List, List)' method "
+ "from given StreamTableEnvironment instance by
Java reflection. This is unexpected.",
e);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException("Failed to wait for insert job to
finish.", e);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 53cc1f6b6..7d6ce1f33 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -40,11 +40,6 @@ import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
-import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
@@ -116,31 +111,8 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
"dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
.withNotMatchedBySourceDelete("dt >= '02-28'");
- String mergeActions =
- String.format(
- "%s,%s,%s,%s,%s",
- MATCHED_UPSERT,
- MATCHED_DELETE,
- NOT_MATCHED_INSERT,
- NOT_MATCHED_BY_SOURCE_UPSERT,
- NOT_MATCHED_BY_SOURCE_DELETE);
- String procedureStatement =
- String.format(
- "CALL sys.merge_into('%s.T', '', '', 'default.S', 'T.k
= S.k AND T.dt = S.dt', '%s', %s)",
- database,
- mergeActions,
- "'T.v <> S.v AND S.v IS NOT NULL', "
- + "'v = S.v, last_action =
''matched_upsert''', "
- + "'S.v IS NULL', "
- + "'', "
- + "'S.k, S.v, ''insert'', S.dt', "
- + "'dt < ''02-28''', "
- + "'v = v || ''_nmu'', last_action =
''not_matched_upsert''', "
- + "'dt >= ''02-28'''");
-
validateActionRunResult(
action,
- procedureStatement,
expected,
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
@@ -200,15 +172,15 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
String procedureStatement =
String.format(
- "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k =
S.k AND TT.dt = S.dt', '%s', %s)",
- inDefault ? database : "test_db", MATCHED_DELETE,
"'S.v IS NULL'");
+ "CALL sys.merge_into('%s.T', 'TT', '', 'S', 'TT.k =
S.k AND TT.dt = S.dt', 'S.v IS NULL')",
+ inDefault ? database : "test_db");
- validateActionRunResult(
- action,
- procedureStatement,
+ List<Row> streamingExpected =
Arrays.asList(
changelogRow("-D", 4, "v_4", "creation", "02-27"),
- changelogRow("-D", 8, "v_8", "creation", "02-28")),
+ changelogRow("-D", 8, "v_8", "creation", "02-28"));
+
+ List<Row> batchExpected =
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -217,7 +189,13 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 6, "v_6", "creation", "02-28"),
changelogRow("+I", 7, "v_7", "creation", "02-28"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
- changelogRow("+I", 10, "v_10", "creation", "02-28")));
+ changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ validateActionRunResult(action, streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected,
batchExpected);
+ }
}
@ParameterizedTest(name = "in-default = {0}")
@@ -242,20 +220,20 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
String procedureStatement =
String.format(
- "CALL sys.merge_into('default.T', '', '', '%s', 'T.k =
S.k AND T.dt = S.dt', '%s', %s)",
- sourceTableName, MATCHED_DELETE, "'S.v IS NULL'");
+ "CALL sys.merge_into('default.T', '', '', '%s', 'T.k =
S.k AND T.dt = S.dt', 'S.v IS NULL')",
+ sourceTableName);
if (!inDefault) {
sEnv.executeSql("USE `default`");
bEnv.executeSql("USE `default`");
}
- validateActionRunResult(
- action,
- procedureStatement,
+ List<Row> streamingExpected =
Arrays.asList(
changelogRow("-D", 4, "v_4", "creation", "02-27"),
- changelogRow("-D", 8, "v_8", "creation", "02-28")),
+ changelogRow("-D", 8, "v_8", "creation", "02-28"));
+
+ List<Row> batchExpected =
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -264,7 +242,13 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 6, "v_6", "creation", "02-28"),
changelogRow("+I", 7, "v_7", "creation", "02-28"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
- changelogRow("+I", 10, "v_10", "creation", "02-28")));
+ changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ validateActionRunResult(action, streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected,
batchExpected);
+ }
}
@ParameterizedTest(name = "useCatalog = {0}")
@@ -306,23 +290,21 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
String procedureStatement =
String.format(
- "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
S.k AND T.dt = S.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
S.k AND T.dt = S.dt', 'S.v IS NULL')",
database,
useCatalog
? String.format(
"%s;%s;%s",
escapeCatalog, "USE CATALOG test_cat",
escapeDdl)
: String.format("%s;%s", escapeCatalog,
escapeDdl),
- useCatalog ? "S" : "test_cat.default.S",
- MATCHED_DELETE,
- "'S.v IS NULL'");
+ useCatalog ? "S" : "test_cat.default.S");
- validateActionRunResult(
- action,
- procedureStatement,
+ List<Row> streamingExpected =
Arrays.asList(
changelogRow("-D", 4, "v_4", "creation", "02-27"),
- changelogRow("-D", 8, "v_8", "creation", "02-28")),
+ changelogRow("-D", 8, "v_8", "creation", "02-28"));
+
+ List<Row> batchExpected =
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -331,7 +313,13 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 6, "v_6", "creation", "02-28"),
changelogRow("+I", 7, "v_7", "creation", "02-28"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
- changelogRow("+I", 10, "v_10", "creation", "02-28")));
+ changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ validateActionRunResult(action, streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected,
batchExpected);
+ }
}
@ParameterizedTest(name = "source-qualified = {0}")
@@ -346,16 +334,12 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
String procedureStatement =
String.format(
- "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
SS.k AND T.dt = SS.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
SS.k AND T.dt = SS.dt', '', '*')",
database,
"CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'',
dt FROM S",
- qualified ? "default.SS" : "SS",
- MATCHED_UPSERT,
- "'', '*'");
+ qualified ? "default.SS" : "SS");
- validateActionRunResult(
- action,
- procedureStatement,
+ List<Row> streamingExpected =
Arrays.asList(
changelogRow("-U", 1, "v_1", "creation", "02-27"),
changelogRow("+U", 1, "v_1", "unknown", "02-27"),
@@ -364,7 +348,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
changelogRow("-U", 7, "v_7", "creation", "02-28"),
changelogRow("+U", 7, "Seven", "unknown", "02-28"),
changelogRow("-U", 8, "v_8", "creation", "02-28"),
- changelogRow("+U", 8, null, "unknown", "02-28")),
+ changelogRow("+U", 8, null, "unknown", "02-28"));
+
+ List<Row> batchExpected =
Arrays.asList(
changelogRow("+U", 1, "v_1", "unknown", "02-27"),
changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -375,7 +361,13 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+U", 7, "Seven", "unknown", "02-28"),
changelogRow("+U", 8, null, "unknown", "02-28"),
changelogRow("+I", 9, "v_9", "creation", "02-28"),
- changelogRow("+I", 10, "v_10", "creation", "02-28")));
+ changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ validateActionRunResult(action, streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected,
batchExpected);
+ }
}
@ParameterizedTest(name = "source-qualified = {0}")
@@ -390,19 +382,17 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
String procedureStatement =
String.format(
- "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
SS.k AND T.dt = SS.dt', '%s', %s)",
+ "CALL sys.merge_into('%s.T', '', '%s', '%s', 'T.k =
SS.k AND T.dt = SS.dt', '', '', 'SS.k < 12', '*', '')",
database,
"CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'',
dt FROM S",
- qualified ? "default.SS" : "SS",
- NOT_MATCHED_INSERT,
- "'SS.k < 12', '*'");
+ qualified ? "default.SS" : "SS");
- validateActionRunResult(
- action,
- procedureStatement,
+ List<Row> streamingExpected =
Arrays.asList(
changelogRow("+I", 8, "v_8", "unknown", "02-29"),
- changelogRow("+I", 11, "v_11", "unknown", "02-29")),
+ changelogRow("+I", 11, "v_11", "unknown", "02-29"));
+
+ List<Row> batchExpected =
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
changelogRow("+I", 2, "v_2", "creation", "02-27"),
@@ -415,7 +405,36 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 9, "v_9", "creation", "02-28"),
changelogRow("+I", 10, "v_10", "creation", "02-28"),
changelogRow("+I", 8, "v_8", "unknown", "02-29"),
- changelogRow("+I", 11, "v_11", "unknown", "02-29")));
+ changelogRow("+I", 11, "v_11", "unknown", "02-29"));
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ validateActionRunResult(action, streamingExpected, batchExpected);
+ } else {
+ validateProcedureResult(procedureStatement, streamingExpected,
batchExpected);
+ }
+ }
+
+ @Test
+ public void testProcedureWithDeleteConditionTrue() throws Exception {
+ String procedureStatement =
+ String.format(
+ "CALL sys.merge_into('%s.T', '', '', 'S', 'T.k = S.k
AND T.dt = S.dt', 'TRUE')",
+ database);
+
+ validateProcedureResult(
+ procedureStatement,
+ Arrays.asList(
+ changelogRow("-D", 1, "v_1", "creation", "02-27"),
+ changelogRow("-D", 4, "v_4", "creation", "02-27"),
+ changelogRow("-D", 7, "v_7", "creation", "02-28"),
+ changelogRow("-D", 8, "v_8", "creation", "02-28")),
+ Arrays.asList(
+ changelogRow("+I", 2, "v_2", "creation", "02-27"),
+ changelogRow("+I", 3, "v_3", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28")));
}
//
----------------------------------------------------------------------------------------------------------------
@@ -493,18 +512,11 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
}
private void validateActionRunResult(
- MergeIntoAction action,
- String procedureStatement,
- List<Row> streamingExpected,
- List<Row> batchExpected)
+ MergeIntoAction action, List<Row> streamingExpected, List<Row>
batchExpected)
throws Exception {
BlockingIterator<Row, Row> iterator =
testStreamingRead(buildSimpleQuery("T"), initialRecords);
- if (ThreadLocalRandom.current().nextBoolean()) {
- action.run();
- } else {
- callProcedure(procedureStatement);
- }
+ action.run();
// test streaming read
validateStreamingReadResult(iterator, streamingExpected);
iterator.close();
@@ -512,6 +524,19 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
testBatchRead(buildSimpleQuery("T"), batchExpected);
}
+ private void validateProcedureResult(
+ String procedureStatement, List<Row> streamingExpected, List<Row>
batchExpected)
+ throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ testStreamingRead(buildSimpleQuery("T"), initialRecords);
+ callProcedure(procedureStatement, true, true);
+ // test batch read first to ensure TABLE_DML_SYNC works
+ testBatchRead(buildSimpleQuery("T"), batchExpected);
+ // test streaming read
+ validateStreamingReadResult(iterator, streamingExpected);
+ iterator.close();
+ }
+
private void prepareTargetTable(CoreOptions.ChangelogProducer producer)
throws Exception {
sEnv.executeSql(
buildDdl(