autophagy commented on code in PR #27928:
URL: https://github.com/apache/flink/pull/27928#discussion_r3217432727


##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2034,3 +2034,336 @@ Limitations
 PTFs are in an early stage. The following limitations apply:
 - PTFs cannot run in batch mode.
 - Broadcast state
+
+Testing Process Table Functions
+-------------------------------
+
+The `ProcessTableFunctionTestHarness` provides a lightweight unit testing 
framework for Process Table
+Functions (PTFs). It is useful for unit testing and validating PTF business 
logic, multi-table PTF
+behaviour and validating errors.
+
+For end-to-end integration testing with the full Flink planner and runtime, 
use integration tests
+instead.
+
+{{< top >}}
+
+### Quick Start
+
+{{< tabs "quickstart" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.*;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import 
org.apache.flink.table.runtime.functions.ProcessTableFunctionTestHarness;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+// PTF under test
+@DataTypeHint("ROW<doubled INT>")
+public class DoublePTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+// Test
+@Test
+void testDoublePTF() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+          .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+          .build()) {
+
+    harness.processElement(Row.of(5));
+    harness.processElement(Row.of(10));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).hasSize(2);
+    assertThat(output.get(0)).isEqualTo(Row.of(10));
+    assertThat(output.get(1)).isEqualTo(Row.of(20));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### Common Testing Scenarios
+
+#### Testing Row-Semantic Tables
+
+Use `.withTableArgument()` to configure the input table schema:
+
+{{< tabs "row-semantic" >}}
+{{< tab "Java" >}}
+```java
+public class PassthroughPTF extends ProcessTableFunction<Integer> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    collect(input.getFieldAs("value"));
+  }
+}
+
+@Test
+void testPassthrough() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+          .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+          .build()) {
+
+    harness.processElement(Row.of(42));
+    harness.processElement(Row.of(100));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(42, 100);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Set-Semantic Tables with Partitioning
+
+For `SET_SEMANTIC_TABLE`, use `.withPartitionBy()` to configure partition 
columns:
+
+{{< tabs "set-semantic" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<doubled INT>")
+public class PartitionedPTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+@Test
+void testPartitionedPTF() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+          .withTableArgument("input", DataTypes.of("ROW<key STRING, value 
INT>"))
+          .withPartitionBy("input", "key")
+          .build()) {
+
+    harness.processElement(Row.of("A", 10));
+    harness.processElement(Row.of("B", 20));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output.get(0)).isEqualTo(Row.of("A", 20));
+    assertThat(output.get(1)).isEqualTo(Row.of("B", 40));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Multiple Table Arguments
+
+Use `processElementForTable()` to specify which table receives each row:
+
+{{< tabs "multi-table" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<output STRING>")
+public class JoinPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row left,
+      @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row right) {
+    if (left != null) {
+      collect(Row.of("LEFT: " + left));
+    }
+    if (right != null) {
+      collect(Row.of("RIGHT: " + right));
+    }
+  }
+}
+
+@Test
+void testMultiTable() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(JoinPTF.class)
+          .withTableArgument("left", DataTypes.of("ROW<id INT, name STRING>"))
+          .withPartitionBy("left", "id")
+          .withTableArgument("right", DataTypes.of("ROW<id INT, city STRING>"))
+          .withPartitionBy("right", "id")
+          .build()) {
+
+    // Use processElementForTable() to target specific tables
+    harness.processElementForTable("left", Row.of(1, "Alice"));
+    harness.processElementForTable("right", Row.of(1, "Berlin"));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output.get(0)).isEqualTo(Row.of(1, null, "LEFT: +I[1, Alice]"));
+    assertThat(output.get(1)).isEqualTo(Row.of(null, 1, "RIGHT: +I[1, 
Berlin]"));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing with Scalar Arguments
+
+Use `.withScalarArgument()` to configure scalar parameter values:
+
+{{< tabs "scalar-args" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<value INT>")
+public class FilterPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
+      int threshold) {
+    int value = input.getFieldAs("value");
+    if (value > threshold) {
+      collect(Row.of(value));
+    }
+  }
+}
+
+@Test
+void testFilter() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+          .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+          .withScalarArgument("threshold", 50)  // Configure scalar value
+          .build()) {
+
+    harness.processElement(Row.of(30));
+    harness.processElement(Row.of(70));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(Row.of(70));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Scalar-Only PTFs**: For PTFs with only scalar arguments, use `process()` to 
trigger evaluation:
+
+{{< tabs "scalar-only" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<sum INT>")
+public class AddPTF extends ProcessTableFunction<Row> {
+  public void eval(int a, int b) {
+    collect(Row.of(a + b));
+  }
+}
+
+@Test
+void testScalarOnly() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(AddPTF.class)
+          .withScalarArgument("a", 5)
+          .withScalarArgument("b", 7)
+          .build()) {
+
+    harness.process();  // Use process() instead of processElement()
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(Row.of(12));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Configuring Table Argument Types
+
+The harness supports two ways to specify table argument types:
+
+{{< tabs "type-config" >}}
+{{< tab "Java" >}}
+```java
+// Option 1: Inline type annotation using DataTypeHint
+@DataTypeHint("ROW<doubled INT>")
+public class InlineTypePTF extends ProcessTableFunction<Row> {
+  public void eval(
+      @ArgumentHint(
+        value = ArgumentTrait.ROW_SEMANTIC_TABLE,
+        type = @DataTypeHint("ROW<value INT>")
+      ) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+@Test
+void testInlineType() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(InlineTypePTF.class).build()) {
+
+    harness.processElement(Row.of(5));
+    assertThat(harness.getOutput()).containsExactly(Row.of(10));
+  }
+}
+
+// Option 2: Builder configuration
+@Test
+void testBuilderType() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+          .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+          .build()) {
+
+    harness.processElement(Row.of(5));
+    assertThat(harness.getOutput()).containsExactly(Row.of(10));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}

Review Comment:
   Yeah, that language is better. The first example is technically a way to 
specify the type for the harness (since the harness extracts the type from the 
system type inference for the input/output converters) but the user doesnt need 
to do anything in that case, so it's not really necessary to inform them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to