RE: Unit-testing BEAM pipelines with PROCESSING_TIME timers

2020-05-11 Thread Robert.Butcher
Many thanks, Darshan!  I’ve made equivalent changes to my test case and it’s 
working fine.

Kind regards,

Rob

From: Darshan Jani [mailto:darshanjani...@gmail.com]
Sent: 11 May 2020 15:08
To: dev@beam.apache.org
Subject: Re: Unit-testing BEAM pipelines with PROCESSING_TIME timers


*
"This is an external email. Do you know who has sent it? Can you be sure that 
any links and attachments contained within it are safe? If in any doubt, use 
the Phishing Reporter Button in your Outlook client or forward the email as an 
attachment to ~ I've Been Phished"
*
Hi Robert,

I found this sample test with Timer on processing time.
From the error, I assume there may be is a problem what are you asserting in 
your PAssert.

https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L3633-L3665

I ran it locally and it runs fine.

-Regards
Darshan

On Mon, May 11, 2020 at 5:28 PM 
mailto:robert.butc...@natwestmarkets.com>> 
wrote:
I have a BEAM DoFn that I’m attempting to unit test.  It involves using a timer 
based on processing time and I’ve not managed to get it to fire.  The relevant 
code excerpts are as follows:

@TimerId("timer")
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);


@ProcessElement
public void process(@TimerId("timer") Timer timer) {
// Set a processing time timer to fire in 5 seconds so we can poll BigQuery
timer.offset(Duration.standardSeconds(5)).setRelative();
}


@OnTimer("timer")
public void onTimer() {
System.out.println("In onTimer");

When I use a TestPipeline with an appropriate PAssert, it always results in the 
following exception:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.util.NoSuchElementException

  at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
  at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
  at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
  at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
  at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
  at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
  at 
com.nwm.foundry.atomic.AtomicCommitFnTest.shouldGenerateCorrectEvent(AtomicCommitFnTest.java:28)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
  at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  at 
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
  at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
  at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
  at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
  at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
  at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.util.NoSuchElementException
  at java.util.ArrayList$Itr.next(ArrayList.java:862)
  at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.co

Re: Unit-testing BEAM pipelines with PROCESSING_TIME timers

2020-05-11 Thread Darshan Jani
Hi Robert,

I found this sample test with Timer on processing time.
>From the error, I assume there may be is a problem what are you asserting
in your PAssert.

https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L3633-L3665

I ran it locally and it runs fine.

-Regards
Darshan

On Mon, May 11, 2020 at 5:28 PM  wrote:

> I have a BEAM DoFn that I’m attempting to unit test.  It involves using a
> timer based on *processing time* and I’ve not managed to get it to fire.
> The relevant code excerpts are as follows:
>
>
>
> @TimerId("timer")
> private final TimerSpec timer = TimerSpecs.*timer*(TimeDomain.
> *PROCESSING_TIME*);
>
>
>
> @ProcessElement
> public void process(@TimerId("timer") Timer timer) {
> // Set a processing time timer to fire in 5 seconds so we can poll 
> BigQuery
> timer.offset(Duration.*standardSeconds*(5)).setRelative();
> }
>
>
>
> @OnTimer("timer")
> public void onTimer() {
> System.*out*.println("In onTimer");
>
>
>
> When I use a TestPipeline with an appropriate PAssert, it always results
> in the following exception:
>
>
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.util.NoSuchElementException
>
>
>
>   at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>
>   at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>
>   at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>
>   at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>
>   at
> org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>
>   at
> org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>
>   at
> com.nwm.foundry.atomic.AtomicCommitFnTest.shouldGenerateCorrectEvent(AtomicCommitFnTest.java:28)
>
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>   at java.lang.reflect.Method.invoke(Method.java:498)
>
>   at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>
>   at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>
>   at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>
>   at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>
>   at
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>
>   at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>
>   at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>
>   at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>
>   at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>
>   at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>
>   at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>
>   at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
>
> Caused by: java.util.NoSuchElementException
>
>   at java.util.ArrayList$Itr.next(ArrayList.java:862)
>
>   at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:302)
>
>   at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:254)
>
>   at
> org.apache.beam.sdk.testing.PAssert$SingletonCheckerDoFn.processElement(PAssert.java:1417)
>
>
>
> Swapping the timer for an EVENT_TIME timer works fine.
>
>
>
> Is there a trick I’m missing here?
>
>
>
> Kind regards,
>
>
>
> Rob
>
>
>
>
>
> *Robert Butcher*
>
> *Technical Architect | Foundry/SRS | NatWest Markets*
>
> WeWork, 10 Devonshire Square, London,