autophagy commented on code in PR #28212:
URL: https://github.com/apache/flink/pull/28212#discussion_r3288473359
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testListViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 1));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {1}));
+
+ harness.processElementForTable("input", Row.of("A", 2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new
Integer[] {1, 2}));
+
+ org.apache.flink.table.api.dataview.ListView<Integer> listState =
+ harness.getStateForKey("listState", Row.of("A"));
+ assertThat(listState.get()).containsExactly(1, 2);
+
+ harness.close();
+ }
+
+ @Test
+ void testMapViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .build();
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1", "foo",
1));
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("P1", "foo",
2));
+
+ harness.processElementForTable("input", Row.of("P1", "bar"));
+
+ org.apache.flink.table.api.dataview.MapView<String, Integer> mapState =
+ harness.getStateForKey("mapState", Row.of("P1"));
+ assertThat(mapState.get("foo")).isEqualTo(2);
+ assertThat(mapState.get("bar")).isEqualTo(1);
+
+ harness.close();
+ }
+
+ @Test
+ void testEmptyState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateForPartition() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateForPartition(Row.of("Alice"));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateEntry() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateEntry(Row.of("Alice"), "state");
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(0L);
+
+ harness.close();
+ }
+
+ @Test
+ void testMultipleStateParameters() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMultipleStates.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 10));
+ harness.processElementForTable("input", Row.of("A", 20));
+ harness.processElementForTable("input", Row.of("B", 5));
+
+ assertThat(harness.getOutput())
+ .containsExactly(Row.of("A", 1L, 10), Row.of("A", 2L, 30),
Row.of("B", 1L, 5));
+
+ PTFWithMultipleStates.CounterState counterA =
+ harness.getStateForKey("counter", Row.of("A"));
+ assertThat(counterA.count).isEqualTo(2L);
+
+ ListView<Integer> historyA = harness.getStateForKey("history",
Row.of("A"));
+ assertThat(historyA.get()).containsExactly(10, 20);
+
+ harness.close();
+ }
+
+ @Test
+ void testInitialStateWithListView() throws Exception {
+ ListView<Integer> initialList = new ListView<>();
+ initialList.add(100);
+ initialList.add(200);
+
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .withInitialStateArgument("listState", Row.of("A"),
initialList)
+ .build();
+
+ ListView<Integer> listState = harness.getStateForKey("listState",
Row.of("A"));
+ assertThat(listState.get()).containsExactly(100, 200);
+
+ harness.processElementForTable("input", Row.of("A", 3));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {100, 200, 3}));
+
+ harness.close();
+ }
+
+ @Test
+ void testInitialStateWithMapView() throws Exception {
+ MapView<String, Integer> initialMap = new MapView<>();
+ initialMap.put("existing", 42);
+
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .withInitialStateArgument("mapState", Row.of("P1"),
initialMap)
+ .build();
+
+ MapView<String, Integer> mapState = harness.getStateForKey("mapState",
Row.of("P1"));
+ assertThat(mapState.get("existing")).isEqualTo(42);
+
+ harness.processElementForTable("input", Row.of("P1", "existing"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1",
"existing", 43));
+
+ harness.close();
+ }
+
Review Comment:
On the third point, it's a mixture. As far as I can tell the system type
inference will go by the annotations, so any type incompatibilities would be
surfaced during state serde, both in live and in the harness. So it would be
bubbled up to the harness to catch, similar to live (i think)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]