I'm receiving the following exception when trying to use a KafkaSource from
the new DataSource API.

Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)

Here is my code (kotlin)

val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
    val builder = KafkaSource.builder<String>()
        .setBootstrapServers(params.get("bootstrapServers"))
        .setGroupId(params.get("groupId"))
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setTopics("topic")
        
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

    if (params.getBoolean("boundedSource", false)) {
        builder.setBounded(OffsetsInitializer.latest())
    }

    return builder.build()
}




I'm setting the deserializer using the ValueDeserializerWrapper as
described in the KafkaSourceBuilder javadoc example
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

Looking at the code for the ValueDeserializerWrapper, it appears that the
deserializer isn't actually set until the deserialize method is called, but
getProducedType is actually called first resulting in the
NullPointerException. What am I missing?

Thanks,
Bobby

-- 
This electronic communication and the information and any files transmitted 
with it, or attached to it, are confidential and are intended solely for 
the use of the individual or entity to whom it is addressed and may contain 
information that is confidential, legally privileged, protected by privacy 
laws, or otherwise restricted from disclosure to anyone else. If you are 
not the intended recipient or the person responsible for delivering the 
e-mail to the intended recipient, you are hereby notified that any use, 
copying, distributing, dissemination, forwarding, printing, or copying of 
this e-mail is strictly prohibited. If you received this e-mail in error, 
please return the e-mail to the sender, delete it from your computer, and 
destroy any printed copy of it.

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to