[ https://issues.apache.org/jira/browse/BEAM-12924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joar Wandborg updated BEAM-12924: --------------------------------- Affects Version/s: 2.32.0 > Seemingly unnecessary NullPointerExceptions when using Latest.perKey(), > Max.perKey() > ------------------------------------------------------------------------------------ > > Key: BEAM-12924 > URL: https://issues.apache.org/jira/browse/BEAM-12924 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.31.0, 2.32.0 > Environment: OpenJDK 11 > beam-sdks-java-bom 2.31.0 > Reporter: Joar Wandborg > Priority: P2 > > > This test > {code:java} > @Test > public void shouldNotEncodeNull_LatestPerKey() { > final var actual = pipeline > .apply(Create.of( > ImmutableList.of( > KV.of("a", "not null"), > KV.of("a", "also not null") > ) > ) > ) > .apply(Latest.perKey()); > PAssert.that(actual).containsInAnyOrder(KV.of("a", "also not null")); > pipeline.run(); > } > {code} > throws > {code:java} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null String > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:398) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:334) > at x.y.z > .NullPerKeyTest.shouldNotEncodeNull_LatestPerKey(NullPerKeyTest.java:61) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null > String > at > org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:74) > at > org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:37) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37) > at > org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144) > {code} > which is unexpected. > Additionally this test > {code:java} > @Test > public void shouldNotEncodeNull_MaxPerKey() { > final var actual = pipeline > .apply(TestStream.create(KvCoder.of(StringUtf8Coder.of(), > VarIntCoder.of())) > .addElements( > TimestampedValue.of(KV.of("a", 2), > Instant.ofEpochMilli(42000L)), > TimestampedValue.of(KV.of("a", 1), > Instant.ofEpochMilli(960000L)) > ) > .advanceWatermarkToInfinity() > ) > .apply(Window > .<KV<String, Integer>>into(new GlobalWindows()) > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > .discardingFiredPanes()) > .apply(Max.perKey()); > PAssert.that(actual).containsInAnyOrder(KV.of("a", 2)); > pipeline.run(); > } > {code} > throws > {code:java} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot > encode a null Integer at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:398) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:334) > at > co.x.y.NullPerKeyTest.shouldNotEncodeNull_MaxPerKey(NullPerKeyTest.java:45) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:322) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null Integer > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:117) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:406) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2180) > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null > Integer > at org.apache.beam.sdk.coders.VarIntCoder.encode(VarIntCoder.java:49) > at org.apache.beam.sdk.coders.VarIntCoder.encode(VarIntCoder.java:33) > at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37) > at > org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54) > at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118) > at > org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:406) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2180) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232) > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191) > at > org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79) > at > org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) > at > org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) > at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) > at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829){code} > unless you change {{discardingFiredPanes()}} to {{accumulatingFiredPanes()}}, > which is also unexpected. -- This message was sent by Atlassian Jira (v8.20.1#820001)