Hi Ted, I should have written that we’re using Flink 1.4.0.
Thanks for the suggestion re: FLINK-8268 <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the issue (though the pull request <https://github.com/apache/flink/pull/5193/files> appears fairly complex so I’ll need some time to study it). Best Regards, - Chris > On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > Which release are you using ? > > See if the work around from FLINK-8268 helps. > > Cheers > > On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider > <cschnei...@scaleunlimited.com <mailto:cschnei...@scaleunlimited.com>> wrote: > Hi Gang, > > I’m having trouble getting my streaming unit test to work. The following code: > > @Test > public void testDemo() throws Throwable { > OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> > testHarness = > new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, > CrawlStateUrl>( > new StreamFlatMap<>(new DomainDBFunction()), > new PldKeySelector<CrawlStateUrl>(), > BasicTypeInfo.STRING_TYPE_INFO, > 1, > 1, > 0); > testHarness.setup(); > testHarness.open(); > > for (int i = 0; i < 10; i++) { > String urlString = String.format("https://domain-%d.com/page1 > <https://domain-%d.com/page1>", i); > CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString)); > testHarness.processElement(new StreamRecord<>(url)); > } > testHarness.snapshot(0L, 0L); > } > > > Generates the following exception: > > DomainDBFunctionTest.testDemo > testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest) > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 26 more > > I tried explicitly calling testHarness.setStateBackend(new > MemoryStateBackend()), but that didn’t seem to help. I could provide more of > my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, > etc.), but that doesn’t seem like it would have much to do with the problem. > > Any advice would be most welcome. > > Thanks, > > - Chris > > ----------------------------------------- > Chris Schneider > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > custom big data solutions > ----------------------------------------- > > ----------------------------------------- Chris Schneider http://www.scaleunlimited.com custom big data solutions -----------------------------------------