[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-05-20 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21160:
---
  Labels: auto-deprioritized-major pull-request-available  (was: 
pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.12.3
>Reporter: Qingsheng Ren
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.13.0
>
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-05-08 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-21160:
--
Affects Version/s: (was: 1.13.0)

> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.12.3
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available, stale-major
> Fix For: 1.13.0
>
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-05-08 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-21160:
--
Fix Version/s: 1.13.0

> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available, stale-major
> Fix For: 1.13.0
>
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-05-08 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-21160:
--
Affects Version/s: 1.13.0
   1.12.3

> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.0, 1.12.3
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available, stale-major
> Fix For: 1.13.0
>
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-04-22 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-21160:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> The problem seems to be that the {{ValueDeserializerWrapper}} does not set 
> the deserializer the deserialize method is called, but {{getProducedType}} is 
> actually called first resulting in the {{NullPointerException}}.
> https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-03-10 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-21160:
-
Description: 
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.

A user reported that the new {{KafkaSource}} fails with a 
{{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at 
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the 
deserializer the deserialize method is called, but {{getProducedType}} is 
actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

  was:
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14364=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7

A user reported that the new {{KafkaSource}} fails with a 
{{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at 
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the 
deserializer the deserialize method is called, but {{getProducedType}} is 
actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E


> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: 

[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-03-10 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-21160:
-
Description: 
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14364=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7

A user reported that the new {{KafkaSource}} fails with a 
{{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at 
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the 
deserializer the deserialize method is called, but {{getProducedType}} is 
actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

  was:
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.

A user reported that the new {{KafkaSource}} fails with a 
{{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at 
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the 
deserializer the deserialize method is called, but {{getProducedType}} is 
actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E


> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: 

[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-03-10 Thread Dong Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-21160:
-
Description: 
The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.

A user reported that the new {{KafkaSource}} fails with a 
{{NullPointerException}}:

{code}
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
at 
org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
{code}

when setting it up like this:

{code}
val kafkaSource = buildKafkaSource(params)
val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), 
"kafka")

private fun buildKafkaSource(params: ParameterTool): KafkaSource {
val builder = KafkaSource.builder()
.setBootstrapServers(params.get("bootstrapServers"))
.setGroupId(params.get("groupId"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setTopics("topic")

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))

if (params.getBoolean("boundedSource", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
{code}

The problem seems to be that the {{ValueDeserializerWrapper}} does not set the 
deserializer the deserialize method is called, but {{getProducedType}} is 
actually called first resulting in the {{NullPointerException}}.

https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E

  was:The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't 
be instantiated until method {{deserialize()}} is invoked in runtime, so in the 
job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
because of referencing the uninstantiated variable {{deserializer}}.


> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.
> A user reported that the new {{KafkaSource}} fails with a 
> {{NullPointerException}}:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> at 
> org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79)
> at 
> org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715)
> {code}
> when setting it up like this:
> {code}
> val kafkaSource = buildKafkaSource(params)
> val datastream = env.fromSource(kafkaSource, 
> WatermarkStrategy.noWatermarks(), "kafka")
> private fun buildKafkaSource(params: ParameterTool): KafkaSource {
> val builder = KafkaSource.builder()
> .setBootstrapServers(params.get("bootstrapServers"))
> .setGroupId(params.get("groupId"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setTopics("topic")
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java))
> if (params.getBoolean("boundedSource", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> return builder.build()
> }
> {code}
> 

[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked

2021-01-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-21160:
---
Labels: pull-request-available  (was: )

> ValueDeserializerWrapper throws NullPointerException when getProducedType is 
> invoked
> 
>
> Key: FLINK-21160
> URL: https://issues.apache.org/jira/browse/FLINK-21160
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Reporter: Qingsheng Ren
>Priority: Major
>  Labels: pull-request-available
>
> The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be 
> instantiated until method {{deserialize()}} is invoked in runtime, so in the 
> job compiling stage when invoking {{getProducedType()}}, NPE will be thrown 
> because of referencing the uninstantiated variable {{deserializer}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)