[ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-21160:
-----------------------------
    Description: 
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.

A user reported that the new {{KafkaSource}} fails with a 
{{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at 
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
    val builder = KafkaSource.builder<String>()
        .setBootstrapServers(params.get("bootstrapServers"))
        .setGroupId(params.get("groupId"))
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setTopics("topic")
        
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

    if (params.getBoolean("boundedSource", false)) {
        builder.setBounded(OffsetsInitializer.latest())
    }

    return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the 
deserializer the deserialize method is called, but {{getProducedType}} is 
actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

  was:The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't 
be instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.


> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-21160
>                 URL: https://issues.apache.org/jira/browse/FLINK-21160
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Qingsheng Ren
>            Priority: Major
>              Labels: pull-request-available
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource<String> {
>     val builder = KafkaSource.builder<String>()
>         .setBootstrapServers(params.get("bootstrapServers"))
>         .setGroupId(params.get("groupId"))
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setTopics("topic")
>         
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
>     if (params.getBoolean("boundedSource", false)) {
>         builder.setBounded(OffsetsInitializer.latest())
>     }
>     return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to