rkhachatryan commented on code in PR #27068:
URL: https://github.com/apache/flink/pull/27068#discussion_r2420545208


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java:
##########
@@ -162,9 +189,118 @@ public boolean equals(RowData row1, RowData row2) {
     }
 
     private static class TestUpsertKeyEqualiser implements RecordEqualiser {
+
         @Override
         public boolean equals(RowData row1, RowData row2) {
-            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) 
== row2.getLong(0);
+            return row1.getRowKind() == row2.getRowKind()
+                    && row1.getLong(UPSERT_KEY) == row2.getLong(UPSERT_KEY);
         }
     }
+
+    static KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            OneInputStreamOperator<RowData, RowData> materializer,
+            SinkUpsertMaterializerStateBackend backend,
+            LogicalType[] types)
+            throws Exception {
+        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        materializer,
+                        HandwrittenSelectorUtil.getRowDataSelector(new int[] 
{1}, types),
+                        HandwrittenSelectorUtil.getRowDataSelector(new int[] 
{1}, types)
+                                .getProducedType());
+        testHarness.setStateBackend(backend.create(true));
+        return testHarness;
+    }
+
+    @Test
+    public void testEmptyUpsertKey() throws Exception {
+        testRecovery(createOperator(LOGICAL_TYPES), 
createOperatorWithoutUpsertKey());
+        testRecovery(createOperatorWithoutUpsertKey(), 
createOperator(LOGICAL_TYPES));
+    }
+
+    private void testRecovery(
+            OneInputStreamOperator<RowData, RowData> from,
+            OneInputStreamOperator<RowData, RowData> to)
+            throws Exception {
+        OperatorSubtaskState snapshot;
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(from)) {
+            testHarness.open();
+            snapshot = testHarness.snapshot(1L, 1L);
+        }
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(to)) {
+            testHarness.initializeState(snapshot);
+            testHarness.open();
+        }
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            OneInputStreamOperator<RowData, RowData> m2) throws Exception {
+        return createHarness(m2, stateBackend, LOGICAL_TYPES);
+    }
+
+    @Test
+    public void testVersionStateGrowth() throws Exception {
+        int dop = 2;
+        int numIterations = 10;
+        OperatorSnapshotFinalizer[] snapshots = new 
OperatorSnapshotFinalizer[dop];
+        long[] prevStateSizes = new long[dop];
+        for (int i = 0; i < numIterations; i++) {
+            for (int subtask = 0; subtask < dop; subtask++) {
+                snapshots[subtask] = initAndSnapshot(snapshots[subtask], i);
+                long currentStateSize =
+                        snapshots[subtask]
+                                .getJobManagerOwnedState()
+                                .getManagedOperatorState()
+                                .stream()
+                                .mapToLong(StateObject::getStateSize)
+                                .sum();
+                if (i > 0) {
+                    assertEquals(prevStateSizes[subtask], currentStateSize);
+                }
+                prevStateSizes[subtask] = currentStateSize;
+            }
+            List<OperatorStateHandle> union =
+                    Arrays.stream(snapshots)
+                            .flatMap(
+                                    s ->
+                                            s
+                                                    .getJobManagerOwnedState()
+                                                    .getManagedOperatorState()
+                                                    .stream())
+                            .collect(Collectors.toList());
+            for (int j = 0; j < dop; j++) {
+                snapshots[j] =
+                        new OperatorSnapshotFinalizer(
+                                
snapshots[j].getJobManagerOwnedState().toBuilder()
+                                        .setManagedOperatorState(new 
StateObjectCollection<>(union))
+                                        .build(),
+                                snapshots[j].getTaskLocalState());
+            }
+        }
+    }
+
+    private OperatorSnapshotFinalizer initAndSnapshot(
+            OperatorSnapshotFinalizer from, int newCheckpointID) throws 
Exception {
+        try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
+                createHarness(
+                        createOperator(LOGICAL_TYPES, UPSERT_KEY), 
stateBackend, LOGICAL_TYPES)) {
+            if (from != null) {
+                harness.initializeState(from.getJobManagerOwnedState());
+            }
+            harness.open();
+            return harness.snapshotWithLocalState(newCheckpointID, 
newCheckpointID);
+        }
+    }
+
+    private OneInputStreamOperator<RowData, RowData> 
createOperatorWithoutUpsertKey() {
+        return createOperator(LOGICAL_TYPES, (int[]) null);
+    }
+
+    private OneInputStreamOperator<RowData, RowData> createOperator(
+            LogicalType[] types, int... upsertKey) {
+        return SinkUpsertMaterializer.create(
+                TTL_CONFIG, RowType.of(types), EQUALISER, 
UPSERT_KEY_EQUALISER, upsertKey);
+    }

Review Comment:
   Extracted into separate commits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to