Great to hear. Yes, if you can help fix this issue that would be great.

Cheers,
Till

On Tue, Mar 9, 2021 at 3:41 PM Bobby Richard <bobby.rich...@broadcom.com>
wrote:

> Great thanks, I was able to work around the issue by implementing my own
> KafkaRecordDeserializer. I will take a stab at a PR to fix the bug, should
> be an easy fix.
>
> On Tue, Mar 9, 2021 at 9:26 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Bobby,
>>
>> This is most likely a bug in Flink. Thanks a lot for reporting the issue
>> and analyzing it. I have created an issue for tracking it [1].
>>
>> cc Becket.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21691
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 8, 2021 at 3:35 PM Bobby Richard <bobby.rich...@broadcom.com>
>> wrote:
>>
>>> I'm receiving the following exception when trying to use a KafkaSource
>>> from the new DataSource API.
>>>
>>> Exception in thread "main" java.lang.NullPointerException
>>> at
>>> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
>>> at
>>> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
>>>
>>> Here is my code (kotlin)
>>>
>>> val kafkaSource = buildKafkaSource(params)
>>> val datastream = env.fromSource(kafkaSource, 
>>> WatermarkStrategy.noWatermarks(), "kafka")
>>>
>>> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
>>>     val builder = KafkaSource.builder<String>()
>>>         .setBootstrapServers(params.get("bootstrapServers"))
>>>         .setGroupId(params.get("groupId"))
>>>         .setStartingOffsets(OffsetsInitializer.earliest())
>>>         .setTopics("topic")
>>>         
>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>>>
>>>     if (params.getBoolean("boundedSource", false)) {
>>>         builder.setBounded(OffsetsInitializer.latest())
>>>     }
>>>
>>>     return builder.build()
>>> }
>>>
>>>
>>>
>>>
>>> I'm setting the deserializer using the ValueDeserializerWrapper as
>>> described in the KafkaSourceBuilder javadoc example
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
>>>
>>> Looking at the code for the ValueDeserializerWrapper, it appears that
>>> the deserializer isn't actually set until the deserialize method is called,
>>> but getProducedType is actually called first resulting in the
>>> NullPointerException. What am I missing?
>>>
>>> Thanks,
>>> Bobby
>>>
>>> This electronic communication and the information and any files
>>> transmitted with it, or attached to it, are confidential and are intended
>>> solely for the use of the individual or entity to whom it is addressed and
>>> may contain information that is confidential, legally privileged, protected
>>> by privacy laws, or otherwise restricted from disclosure to anyone else. If
>>> you are not the intended recipient or the person responsible for delivering
>>> the e-mail to the intended recipient, you are hereby notified that any use,
>>> copying, distributing, dissemination, forwarding, printing, or copying of
>>> this e-mail is strictly prohibited. If you received this e-mail in error,
>>> please return the e-mail to the sender, delete it from your computer, and
>>> destroy any printed copy of it.
>>
>>
>
> --
>
> *Bobby Richard*
> R&D Software Engineer   | Information Security Group   | Symantec
> Enterprise Division
> Broadcom
>
> mobile: 337.794.2128
>
> Atlanta, GA (USA)
> bobby.rich...@broadcom.com   | broadcom.com
>
> This electronic communication and the information and any files
> transmitted with it, or attached to it, are confidential and are intended
> solely for the use of the individual or entity to whom it is addressed and
> may contain information that is confidential, legally privileged, protected
> by privacy laws, or otherwise restricted from disclosure to anyone else. If
> you are not the intended recipient or the person responsible for delivering
> the e-mail to the intended recipient, you are hereby notified that any use,
> copying, distributing, dissemination, forwarding, printing, or copying of
> this e-mail is strictly prohibited. If you received this e-mail in error,
> please return the e-mail to the sender, delete it from your computer, and
> destroy any printed copy of it.

Reply via email to