This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3a40361 [FLINK-22899][table] ValuesUpsertSinkFunction needs to use
global upsert
3a40361 is described below
commit 3a4036129865fcddb809335b3a55a949a75dc8ca
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jun 17 10:28:32 2021 +0800
[FLINK-22899][table] ValuesUpsertSinkFunction needs to use global upsert
This closes #16091
---
.../factories/TestValuesRuntimeFunctions.java | 65 ++++------------------
1 file changed, 10 insertions(+), 55 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index e743c6a..de98081 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -356,19 +356,18 @@ final class TestValuesRuntimeFunctions {
}
}
+ /**
+ * NOTE: This class should use a global map to store upsert values. Just
like other external
+ * databases.
+ */
static class KeyedUpsertingSinkFunction extends AbstractExactlyOnceSink {
private static final long serialVersionUID = 1L;
private final DataStructureConverter converter;
private final int[] keyIndices;
private final int expectedSize;
- // we store key and value as adjacent elements in the ListState
- private transient ListState<String> upsertResultState;
// [key, value] map result
private transient Map<String, String> localUpsertResult;
-
- // received count state
- private transient ListState<Integer> receivedNumState;
private transient int receivedNum;
protected KeyedUpsertingSinkFunction(
@@ -385,61 +384,17 @@ final class TestValuesRuntimeFunctions {
@Override
public void initializeState(FunctionInitializationContext context)
throws Exception {
super.initializeState(context);
- this.upsertResultState =
- context.getOperatorStateStore()
- .getListState(
- new
ListStateDescriptor<>("sink-upsert-results", Types.STRING));
- this.localUpsertResult = new HashMap<>();
- this.receivedNumState =
- context.getOperatorStateStore()
- .getListState(
- new
ListStateDescriptor<>("sink-received-num", Types.INT));
-
- if (context.isRestored()) {
- String key = null;
- String value;
- for (String entry : upsertResultState.get()) {
- if (key == null) {
- key = entry;
- } else {
- value = entry;
- localUpsertResult.put(key, value);
- // reset
- key = null;
- }
- }
- if (key != null) {
- throw new RuntimeException("The upsertResultState is
corrupt.");
- }
- for (int num : receivedNumState.get()) {
- // should only be single element
- this.receivedNum = num;
- }
- }
- int taskId = getRuntimeContext().getIndexOfThisSubtask();
synchronized (LOCK) {
- globalUpsertResult
- .computeIfAbsent(tableName, k -> new HashMap<>())
- .put(taskId, localUpsertResult);
+ // always store in a single map, global upsert
+ this.localUpsertResult =
+ globalUpsertResult
+ .computeIfAbsent(tableName, k -> new
HashMap<>())
+ .computeIfAbsent(0, k -> new HashMap<>());
}
}
@Override
- public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- super.snapshotState(context);
- upsertResultState.clear();
- synchronized (LOCK) {
- for (Map.Entry<String, String> entry :
localUpsertResult.entrySet()) {
- upsertResultState.add(entry.getKey());
- upsertResultState.add(entry.getValue());
- }
- }
- receivedNumState.update(Collections.singletonList(receivedNum));
- }
-
- @SuppressWarnings("rawtypes")
- @Override
public void invoke(RowData value, Context context) throws Exception {
RowKind kind = value.getRowKind();
@@ -448,7 +403,7 @@ final class TestValuesRuntimeFunctions {
synchronized (LOCK) {
if (RowUtils.USE_LEGACY_TO_STRING) {
- localRawResult.add(kind.shortString() + "(" +
row.toString() + ")");
+ localRawResult.add(kind.shortString() + "(" + row + ")");
} else {
localRawResult.add(row.toString());
}