Hi Fabian,

I created FLINK-9262 <https://issues.apache.org/jira/browse/FLINK-9262>.

FYI,

- Chris

> On Apr 26, 2018, at 3:07 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Thanks for reporting the issue Chris!
> Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?
> 
> Thank you, Fabian
> 
> [1] https://issues.apache.org/jira/browse/FLINK 
> <https://issues.apache.org/jira/browse/FLINK>
> 
> 2018-04-25 21:11 GMT+02:00 Chris Schneider <cschnei...@scaleunlimited.com 
> <mailto:cschnei...@scaleunlimited.com>>:
> Hi Gang,
> 
> FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also tried 
> cherry-picking the commit that fixed FLINK-8268 
> <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80>
>  to Flink 1.4.0, but that resulted in the same failure mode.
> 
> I guess the takeaway is that this streaming test code harness support (which 
> everyone should note is not yet part of the public Flink API) was apparently 
> fragile in 1.4.0.
> 
> FYI,
> 
> - Chris
> 
> 
>> On Apr 18, 2018, at 8:07 PM, Chris Schneider <cschnei...@scaleunlimited.com 
>> <mailto:cschnei...@scaleunlimited.com>> wrote:
>> 
>> Hi Ted,
>> 
>> I should have written that we’re using Flink 1.4.0.
>> 
>> Thanks for the suggestion re: FLINK-8268 
>> <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the 
>> issue (though the pull request 
>> <https://github.com/apache/flink/pull/5193/files> appears fairly complex so 
>> I’ll need some time to study it).
>> 
>> Best Regards,
>> 
>> - Chris
>> 
>>> On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> Which release are you using ?
>>> 
>>> See if the work around from FLINK-8268 helps.
>>> 
>>> Cheers
>>> 
>>> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider 
>>> <cschnei...@scaleunlimited.com <mailto:cschnei...@scaleunlimited.com>> 
>>> wrote:
>>> Hi Gang,
>>> 
>>> I’m having trouble getting my streaming unit test to work. The following 
>>> code:
>>> 
>>>     @Test
>>>     public void testDemo() throws Throwable {
>>>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> 
>>> testHarness =
>>>             new KeyedOneInputStreamOperatorTestHarness<String, 
>>> CrawlStateUrl, CrawlStateUrl>(
>>>                 new StreamFlatMap<>(new DomainDBFunction()),
>>>                 new PldKeySelector<CrawlStateUrl>(),
>>>                 BasicTypeInfo.STRING_TYPE_INFO,
>>>                 1,
>>>                 1,
>>>                 0);
>>>         testHarness.setup();
>>>         testHarness.open();
>>> 
>>>         for (int i = 0; i < 10; i++) {
>>>             String urlString = String.format("https://domain-%d.com/page1 
>>> <https://domain-%d.com/page1>", i);
>>>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>>>             testHarness.processElement(new StreamRecord<>(url));
>>>         }
>>>         testHarness.snapshot(0L, 0L);
>>>     }
>>> 
>>> 
>>> Generates the following exception:
>>> 
>>> DomainDBFunctionTest.testDemo
>>> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
>>> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
>>> (1/1).
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>>>     at 
>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>>>     at 
>>> com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at 
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>>     at 
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>     at 
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>>     at 
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>     at 
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>>     at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>>     at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>>     at 
>>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>>>     at 
>>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>>>     at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>>>     at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
>>>     at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>>>     at 
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
>>> Caused by: java.lang.NullPointerException
>>>     at 
>>> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
>>>     at 
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>>>     ... 26 more
>>> 
>>> I tried explicitly calling testHarness.setStateBackend(new 
>>> MemoryStateBackend()), but that didn’t seem to help. I could provide more 
>>> of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl, RawUrl, 
>>> etc.), but that doesn’t seem like it would have much to do with the problem.
>>> 
>>> Any advice would be most welcome.
>>> 
>>> Thanks,
>>> 
>>> - Chris
>>> 
>>> -----------------------------------------
>>> Chris Schneider
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> custom big data solutions
>>> -----------------------------------------
>>> 
>>> 
>> 
>> -----------------------------------------
>> Chris Schneider
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> custom big data solutions
>> -----------------------------------------
>> 
> 
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions
> -----------------------------------------
> 
> 

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------

Reply via email to