Thanks for reporting the issue Chris!
Would you mind opening a JIRA issue [1] to track the bug for Flink 1.4?

Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK

2018-04-25 21:11 GMT+02:00 Chris Schneider <cschnei...@scaleunlimited.com>:

> Hi Gang,
>
> FWIW, the code below works just fine using Flink 1.5-SNAPSHOT. I also
> tried cherry-picking the commit that fixed FLINK-8268
> <https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80>
>  to
> Flink 1.4.0, but that resulted in the same failure mode.
>
> I guess the takeaway is that this streaming test code harness support
> (which everyone should note is not yet part of the public Flink API) was
> apparently fragile in 1.4.0.
>
> FYI,
>
> - Chris
>
>
> On Apr 18, 2018, at 8:07 PM, Chris Schneider <
> cschnei...@scaleunlimited.com> wrote:
>
> Hi Ted,
>
> I should have written that we’re using Flink 1.4.0.
>
> Thanks for the suggestion re: FLINK-8268
> <https://issues.apache.org/jira/browse/FLINK-8268>; it could well be the
> issue (though the pull request
> <https://github.com/apache/flink/pull/5193/files> appears fairly complex
> so I’ll need some time to study it).
>
> Best Regards,
>
> - Chris
>
> On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> Which release are you using ?
>
> See if the work around from FLINK-8268 helps.
>
> Cheers
>
> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <
> cschnei...@scaleunlimited.com> wrote:
>
>> Hi Gang,
>>
>> I’m having trouble getting my streaming unit test to work. The following
>> code:
>>
>>     @Test
>>     public void testDemo() throws Throwable {
>>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl>
>> testHarness =
>>             new KeyedOneInputStreamOperatorTestHarness<String,
>> CrawlStateUrl, CrawlStateUrl>(
>>                 new StreamFlatMap<>(new DomainDBFunction()),
>>                 new PldKeySelector<CrawlStateUrl>(),
>>                 BasicTypeInfo.STRING_TYPE_INFO,
>>                 1,
>>                 1,
>>                 0);
>>         testHarness.setup();
>>         testHarness.open();
>>
>>         for (int i = 0; i < 10; i++) {
>>             String urlString = String.format("https://domain-%d.com/page1
>> ", i);
>>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>>             testHarness.processElement(new StreamRecord<>(url));
>>         }
>>         testHarness.snapshot(0L, 0L);
>>     }
>>
>>
>> Generates the following exception:
>>
>> DomainDBFunctionTest.testDemo
>> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
>> java.lang.Exception: Could not complete snapshot 0 for operator MockTask
>> (1/1).
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.snapshotState(AbstractStreamOperator.java:379)
>> at org.apache.flink.streaming.util.AbstractStreamOperatorTestHa
>> rness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>> at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTe
>> st.testDemo(DomainDBFunctionTest.java:51)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
>> FrameworkMethod.java:50)
>> at org.junit.internal.runners.model.ReflectiveCallable.run(Refl
>> ectiveCallable.java:12)
>> at org.junit.runners.model.FrameworkMethod.invokeExplosively(Fr
>> ameworkMethod.java:47)
>> at org.junit.internal.runners.statements.InvokeMethod.evaluate(
>> InvokeMethod.java:17)
>> at org.junit.internal.runners.statements.RunBefores.evaluate(
>> RunBefores.java:26)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:78)
>> at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit
>> 4ClassRunner.java:57)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>> at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.
>> run(JUnit4TestReference.java:50)
>> at org.eclipse.jdt.internal.junit.runner.TestExecution.run(
>> TestExecution.java:38)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTe
>> sts(RemoteTestRunner.java:459)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTe
>> sts(RemoteTestRunner.java:675)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(
>> RemoteTestRunner.java:382)
>> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(
>> RemoteTestRunner.java:192)
>> Caused by: java.lang.NullPointerException
>> at org.apache.flink.util.Preconditions.checkNotNull(Preconditio
>> ns.java:58)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.snapshotFunctionState(StreamingFunctionUtils.java:95)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.snapshotState(AbstractUdfStreamOperator.java:90)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.snapshotState(AbstractStreamOperator.java:357)
>> ... 26 more
>>
>> I tried explicitly calling testHarness.setStateBackend(new
>> MemoryStateBackend()), but that didn’t seem to help. I could provide
>> more of my code (e.g., PldKeySelector, DomainDBFunction, CrawlStateUrl,
>> RawUrl, etc.), but that doesn’t seem like it would have much to do with the
>> problem.
>>
>> Any advice would be most welcome.
>>
>> Thanks,
>>
>> - Chris
>>
>> -----------------------------------------
>> Chris Schneider
>> http://www.scaleunlimited.com
>> custom big data solutions
>> -----------------------------------------
>>
>>
>
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com
> custom big data solutions
> -----------------------------------------
>
>
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com
> custom big data solutions
> -----------------------------------------
>
>

Reply via email to