kl0u commented on a change in pull request #10833: [FLINK-15535][documentation] 
Add usage of ProcessFunctionTestHarnesses for testing documentation
URL: https://github.com/apache/flink/pull/10833#discussion_r366269830
 
 

 ##########
 File path: docs/dev/stream/testing.md
 ##########
 @@ -332,6 +332,87 @@ Many more examples for the usage of these test harnesses 
can be found in the Fli
 
 <span class="label label-info">Note</span> Be aware that 
`AbstractStreamOperatorTestHarness` and its derived classes are currently not 
part of the public API and can be subject to change.
 
+### Unit Testing ProcessFunction
+
+The `ProcessFunction` is a widely used low-level and powerful stream 
processing operation, giving access to the basic building blocks of all 
(acyclic) streaming applications.
+Flink provides a test harness named `ProcessFunctionTestHarnesses` that can be 
used to test your `ProcessFunction`. Considering this example:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public static class PassThroughProcessFunction extends 
ProcessFunction<Integer, Integer> {
+
+       @Override
+       public void processElement(Integer value, Context ctx, 
Collector<Integer> out) throws Exception {
+        out.collect(value);
+       }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class PassThroughProcessFunction extends ProcessFunction[Integer, Integer] {
+
+    @throws[Exception]
+    override def processElement(value: Integer, ctx: ProcessFunction[Integer, 
Integer]#Context, out: Collector[Integer]): Unit = {
+      out.collect(value)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It is very easy to unit test such a function with 
`ProcessFunctionTestHarnesses` by passing suitable arguments and verifying the 
output.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class PassThroughProcessFunctionTest {
+
+    @Test
+    public void testPassThrough() throws Exception {
+
+        //instantiate user-defined function
+        PassThroughProcessFunction processFunction = new 
PassThroughProcessFunction();
+
+        // wrap user defined function into a the corresponding operator
+        OneInputStreamOperatorTestHarness<Integer, Integer> harness = 
ProcessFunctionTestHarnesses
+               .forProcessFunction(processFunction);
+
+        //push (timestamped) elements into the operator (and hence user 
defined function)
+        harness.processElement(1, 10);
+
+        //retrieve list of emitted records for assertions
+        assertEquals(harness.extractOutputValues(), 
Collections.singletonList(1));
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class PassThroughProcessFunctionTest extends FlatSpec with Matchers {
+
+  "PassThroughProcessFunction" should "forward values" in {
+
+    //instantiate user-defined function
+    val processFunction = new PassThroughProcessFunction
+
+    // wrap user defined function into a the corresponding operator
+    val harness = 
ProcessFunctionTestHarnesses.forProcessFunction(processFunction)
+
+    //push (timestamped) elements into the operator (and hence user defined 
function)
+    harness.processElement(1, 10)
+
+    //retrieve list of emitted records for assertions
+    harness.extractOutputValues() should contain (1)
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
 
 Review comment:
   Here I would add something like: "For more examples on how to use the 
`ProcessFunctionTestHarnesses` in order to test the different flavours of the 
`ProcessFunction`, e.g. `KeyedProcessFunction`, `KeyedCoProcessFunction`, 
`BroadcastProcessFunction`, etc, the user is encouraged to look at the 
`ProcessFunctionTestHarnessesTest`." 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to