Hi Fabian, I created FLINK-9262 <https://issues.apache.org/jira/browse/FLINK-9262>.
FYI, - Chris > On Apr 26, 2018, at 3:07 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Thanks for reporting the issue Chris! > Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4? > > Thank you, Fabian > > [1] https://issues.apache.org/jira/browse/FLINK > <https://issues.apache.org/jira/browse/FLINK> > > 2018-04-25 21:11 GMT+02:00 Chris Schneider <cschnei...@scaleunlimited.com > <mailto:cschnei...@scaleunlimited.com>>: > Hi Gang, > > FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried > cherry-picking the commit that fixed FLINK-8268 > <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80> > to Flink 1.4.0, but that resulted in the same failure mode. > > I guess the takeaway is that this streaming test code harness support (which > everyone should note is not yet part of the public Flink API) was apparently > fragile in 1.4.0. > > FYI, > > - Chris > > >> On Apr 18, 2018, at 8:07 PM, Chris Schneider <cschnei...@scaleunlimited.com >> <mailto:cschnei...@scaleunlimited.com>> wrote: >> >> Hi Ted, >> >> I should have written that we’re using Flink 1.4.0. >> >> Thanks for the suggestion re: FLINK-8268 >> <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the >> issue (though the pull request >> <https://github.com/apache/flink/pull/5193/files> appears fairly complex so >> I’ll need some time to study it). >> >> Best Regards, >> >> - Chris >> >>> On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhih...@gmail.com >>> <mailto:yuzhih...@gmail.com>> wrote: >>> >>> Which release are you using ? >>> >>> See if the work around from FLINK-8268 helps. >>> >>> Cheers >>> >>> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider >>> <cschnei...@scaleunlimited.com <mailto:cschnei...@scaleunlimited.com>> >>> wrote: >>> Hi Gang, >>> >>> I’m having trouble getting my streaming unit test to work. The following >>> code: >>> >>> @Test >>> public void testDemo() throws Throwable { >>> OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> >>> testHarness = >>> new KeyedOneInputStreamOperatorTestHarness<String, >>> CrawlStateUrl, CrawlStateUrl>( >>> new StreamFlatMap<>(new DomainDBFunction()), >>> new PldKeySelector<CrawlStateUrl>(), >>> BasicTypeInfo.STRING_TYPE_INFO, >>> 1, >>> 1, >>> 0); >>> testHarness.setup(); >>> testHarness.open(); >>> >>> for (int i = 0; i < 10; i++) { >>> String urlString = String.format("https://domain-%d.com/page1 >>> <https://domain-%d.com/page1>", i); >>> CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString)); >>> testHarness.processElement(new StreamRecord<>(url)); >>> } >>> testHarness.snapshot(0L, 0L); >>> } >>> >>> >>> Generates the following exception: >>> >>> DomainDBFunctionTest.testDemo >>> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest) >>> java.lang.Exception: Could not complete snapshot 0 for operator MockTask >>> (1/1). >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) >>> at >>> com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >>> at >>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >>> at >>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >>> at >>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) >>> at >>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) >>> at >>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) >>> Caused by: java.lang.NullPointerException >>> at >>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) >>> at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) >>> ... 26 more >>> >>> I tried explicitly calling testHarness.setStateBackend(new >>> MemoryStateBackend()), but that didn’t seem to help. I could provide more >>> of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, >>> etc.), but that doesn’t seem like it would have much to do with the problem. >>> >>> Any advice would be most welcome. >>> >>> Thanks, >>> >>> - Chris >>> >>> ----------------------------------------- >>> Chris Schneider >>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>> custom big data solutions >>> ----------------------------------------- >>> >>> >> >> ----------------------------------------- >> Chris Schneider >> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >> custom big data solutions >> ----------------------------------------- >> > > ----------------------------------------- > Chris Schneider > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > custom big data solutions > ----------------------------------------- > > ----------------------------------------- Chris Schneider http://www.scaleunlimited.com custom big data solutions -----------------------------------------