[ https://issues.apache.org/jira/browse/BEAM-3423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-3423: ---------------------------------- Affects Version/s: 2.3.0 > Distinct.withRepresentativeValueFn throws CoderException "cannot encode null > KV" > --------------------------------------------------------------------------------- > > Key: BEAM-3423 > URL: https://issues.apache.org/jira/browse/BEAM-3423 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.2.0, 2.3.0 > Environment: ubuntu16.04, idea, java8 > Reporter: huangjianhuang > Assignee: Kenneth Knowles > Priority: Major > Fix For: 2.4.0 > > > My code as follow: > {code:java} > pipeline > //Read data > .apply("Read from kafka", > KafkaIO.<String, String>read() > .withBootstrapServers("localhost:9092") > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata() > ) > .apply(Window.<KV<String, > String>>into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) > > .discardingFiredPanes().withAllowedLateness(Duration.ZERO)) > //works fine > // .apply(Distinct.create()) > //ops! -> CoderException: cannot encode a null KV > .apply(Distinct.withRepresentativeValueFn(new > Val()).withRepresentativeType(TypeDescriptors.strings())) > .apply(MapElements.into(TypeDescriptors.nulls()) > .via(input -> { > System.out.println(Instant.now()); > System.out.println(input); > return null; > })); > private static class Val implements SerializableFunction<KV<String, > String>, String> { > @Override > public String apply(KV<String, String> input) { > return input.getValue(); > } > } > {code} > Input words to Kafka: > word1 > //after 10s > word2 > Then got exceptions as follow: > {code:java} > begin > 2018-01-06T11:18:52.971Z > KV{null, a} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot > encode a null KV > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:344) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:314) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:208) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289) > at com.xiaomi.huyu.processor.dev.EntryPoint.main(EntryPoint.java:37) > Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null KV > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) > at > org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:93) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:77) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:62) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106) > at > org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:111) > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2149) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:70) > at > org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:182) > at > org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51) > at > org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146) > at > org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {code} > But if I use .apply(Distinct.create()) , it works fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)