Great to hear. Yes, if you can help fix this issue that would be great. Cheers, Till
On Tue, Mar 9, 2021 at 3:41 PM Bobby Richard <bobby.rich...@broadcom.com> wrote: > Great thanks, I was able to work around the issue by implementing my own > KafkaRecordDeserializer. I will take a stab at a PR to fix the bug, should > be an easy fix. > > On Tue, Mar 9, 2021 at 9:26 AM Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Bobby, >> >> This is most likely a bug in Flink. Thanks a lot for reporting the issue >> and analyzing it. I have created an issue for tracking it [1]. >> >> cc Becket. >> >> [1] https://issues.apache.org/jira/browse/FLINK-21691 >> >> Cheers, >> Till >> >> On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <bobby.rich...@broadcom.com> >> wrote: >> >>> I'm receiving the following exception when trying to use a KafkaSource >>> from the new DataSource API. >>> >>> Exception in thread "main" java.lang.NullPointerException >>> at >>> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) >>> at >>> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) >>> >>> Here is my code (kotlin) >>> >>> val kafkaSource = buildKafkaSource(params) >>> val datastream = env.fromSource(kafkaSource, >>> WatermarkStrategy.noWatermarks(), "kafka") >>> >>> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> { >>> val builder = KafkaSource.builder<String>() >>> .setBootstrapServers(params.get("bootstrapServers")) >>> .setGroupId(params.get("groupId")) >>> .setStartingOffsets(OffsetsInitializer.earliest()) >>> .setTopics("topic") >>> >>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) >>> >>> if (params.getBoolean("boundedSource", false)) { >>> builder.setBounded(OffsetsInitializer.latest()) >>> } >>> >>> return builder.build() >>> } >>> >>> >>> >>> >>> I'm setting the deserializer using the ValueDeserializerWrapper as >>> described in the KafkaSourceBuilder javadoc example >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html >>> >>> Looking at the code for the ValueDeserializerWrapper, it appears that >>> the deserializer isn't actually set until the deserialize method is called, >>> but getProducedType is actually called first resulting in the >>> NullPointerException. What am I missing? >>> >>> Thanks, >>> Bobby >>> >>> This electronic communication and the information and any files >>> transmitted with it, or attached to it, are confidential and are intended >>> solely for the use of the individual or entity to whom it is addressed and >>> may contain information that is confidential, legally privileged, protected >>> by privacy laws, or otherwise restricted from disclosure to anyone else. If >>> you are not the intended recipient or the person responsible for delivering >>> the e-mail to the intended recipient, you are hereby notified that any use, >>> copying, distributing, dissemination, forwarding, printing, or copying of >>> this e-mail is strictly prohibited. If you received this e-mail in error, >>> please return the e-mail to the sender, delete it from your computer, and >>> destroy any printed copy of it. >> >> > > -- > > *Bobby Richard* > R&D Software Engineer | Information Security Group | Symantec > Enterprise Division > Broadcom > > mobile: 337.794.2128 > > Atlanta, GA (USA) > bobby.rich...@broadcom.com | broadcom.com > > This electronic communication and the information and any files > transmitted with it, or attached to it, are confidential and are intended > solely for the use of the individual or entity to whom it is addressed and > may contain information that is confidential, legally privileged, protected > by privacy laws, or otherwise restricted from disclosure to anyone else. If > you are not the intended recipient or the person responsible for delivering > the e-mail to the intended recipient, you are hereby notified that any use, > copying, distributing, dissemination, forwarding, printing, or copying of > this e-mail is strictly prohibited. If you received this e-mail in error, > please return the e-mail to the sender, delete it from your computer, and > destroy any printed copy of it.