[ https://issues.apache.org/jira/browse/BEAM-2114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eugene Kirpichov closed BEAM-2114. ---------------------------------- Resolution: Fixed > KafkaIO broken with CoderException > ---------------------------------- > > Key: BEAM-2114 > URL: https://issues.apache.org/jira/browse/BEAM-2114 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions > Reporter: Devon Meunier > Assignee: Devon Meunier > Fix For: First stable release > > > For a KafkaIO.Read<String, String> I simply replaced {{withKeyCoder}} and > {{withValueCoder}} with {{withKeyDeserializer}} and {{withValueDeserializer}} > using `StringDeserializer.class` on dataflow and I receive the following > traceback: > {code} > java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: > org.apache.beam.sdk.coders.CoderException: cannot encode a null String > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:136) > at > org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) > at > org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null > String > at > org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:75) > at > org.apache.beam.sdk.coders.StringUtf8Coder.encode(StringUtf8Coder.java:41) > at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:88) > at > org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:60) > at > org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:36) > at > org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:122) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:106) > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:91) > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:106) > at > org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:44) > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:113) > ... 8 more > {code} > attempting to use {{readWithCoders(StringUtf8Coder.of(), > StringUtf8Coder.of())}} instead yields: > {code} > java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) > at java.lang.Thread.run(Thread.java:745) > Caused by: > org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder$PopulateDisplayDataException: > Error while populating display data for component: > org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata > at > org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:801) > at > org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.access$100(DisplayData.java:733) > at > org.apache.beam.sdk.transforms.display.DisplayData.from(DisplayData.java:81) > at > org.apache.beam.runners.direct.DisplayDataValidator.evaluateDisplayData(DisplayDataValidator.java:47) > at > org.apache.beam.runners.direct.DisplayDataValidator.access$100(DisplayDataValidator.java:29) > at > org.apache.beam.runners.direct.DisplayDataValidator$Visitor.enterCompositeTransform(DisplayDataValidator.java:56) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:479) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:483) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:232) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:207) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:384) > at > org.apache.beam.runners.direct.DisplayDataValidator.validateTransforms(DisplayDataValidator.java:43) > at > org.apache.beam.runners.direct.DisplayDataValidator.validatePipeline(DisplayDataValidator.java:35) > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:265) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:277) > at > com.shopify.attribution.pipelines.StoreSessions.main(StoreSessions.java:75) > ... 6 more > Caused by: java.lang.RuntimeException: Unknown value type: StringUtf8Coder > at > org.apache.beam.sdk.transforms.display.DisplayData.item(DisplayData.java:886) > at > org.apache.beam.sdk.io.kafka.KafkaIO$Read.populateDisplayData(KafkaIO.java:750) > at > org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.populateDisplayData(KafkaIO.java:785) > at > org.apache.beam.sdk.transforms.display.DisplayData$InternalBuilder.include(DisplayData.java:794) > ... 22 more > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)