[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-21160: --- Labels: auto-deprioritized-major pull-request-available (was: pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.12.3 >Reporter: Qingsheng Ren >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > Fix For: 1.13.0 > > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource { > val builder = KafkaSource.builder() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} > The problem seems to be that the {{ValueDeserializerWrapper}} does not set > the deserializer the deserialize method is called, but {{getProducedType}} is > actually called first resulting in the {{NullPointerException}}. > https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-21160: -- Affects Version/s: (was: 1.13.0) > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.12.3 >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available, stale-major > Fix For: 1.13.0 > > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource { > val builder = KafkaSource.builder() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} > The problem seems to be that the {{ValueDeserializerWrapper}} does not set > the deserializer the deserialize method is called, but {{getProducedType}} is > actually called first resulting in the {{NullPointerException}}. > https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-21160: -- Fix Version/s: 1.13.0 > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available, stale-major > Fix For: 1.13.0 > > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource { > val builder = KafkaSource.builder() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} > The problem seems to be that the {{ValueDeserializerWrapper}} does not set > the deserializer the deserialize method is called, but {{getProducedType}} is > actually called first resulting in the {{NullPointerException}}. > https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-21160: -- Affects Version/s: 1.13.0 1.12.3 > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.12.3 >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available, stale-major > Fix For: 1.13.0 > > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource { > val builder = KafkaSource.builder() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} > The problem seems to be that the {{ValueDeserializerWrapper}} does not set > the deserializer the deserialize method is called, but {{getProducedType}} is > actually called first resulting in the {{NullPointerException}}. > https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-21160: --- Labels: pull-request-available stale-major (was: pull-request-available) > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available, stale-major > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource { > val builder = KafkaSource.builder() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} > The problem seems to be that the {{ValueDeserializerWrapper}} does not set > the deserializer the deserialize method is called, but {{getProducedType}} is > actually called first resulting in the {{NullPointerException}}. > https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-21160: - Description: The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be instantiated until method {{deserialize()}} is invoked in runtime, so in the job compiling stage when invoking {{getProducedType()}}, NPE will be thrown because of referencing the uninstantiated variable {{deserializer}}. A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) {code} when setting it up like this: {code} val kafkaSource = buildKafkaSource(params) val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka") private fun buildKafkaSource(params: ParameterTool): KafkaSource { val builder = KafkaSource.builder() .setBootstrapServers(params.get("bootstrapServers")) .setGroupId(params.get("groupId")) .setStartingOffsets(OffsetsInitializer.earliest()) .setTopics("topic") .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) if (params.getBoolean("boundedSource", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() } {code} The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}. https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E was: The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be instantiated until method {{deserialize()}} is invoked in runtime, so in the job compiling stage when invoking {{getProducedType()}}, NPE will be thrown because of referencing the uninstantiated variable {{deserializer}}. https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14364=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7 A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) {code} when setting it up like this: {code} val kafkaSource = buildKafkaSource(params) val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka") private fun buildKafkaSource(params: ParameterTool): KafkaSource { val builder = KafkaSource.builder() .setBootstrapServers(params.get("bootstrapServers")) .setGroupId(params.get("groupId")) .setStartingOffsets(OffsetsInitializer.earliest()) .setTopics("topic") .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) if (params.getBoolean("boundedSource", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() } {code} The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}. https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components:
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-21160: - Description: The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be instantiated until method {{deserialize()}} is invoked in runtime, so in the job compiling stage when invoking {{getProducedType()}}, NPE will be thrown because of referencing the uninstantiated variable {{deserializer}}. https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=14364=logs=72d4811f-9f0d-5fd0-014a-0bc26b72b642=c1d93a6a-ba91-515d-3196-2ee8019fbda7 A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) {code} when setting it up like this: {code} val kafkaSource = buildKafkaSource(params) val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka") private fun buildKafkaSource(params: ParameterTool): KafkaSource { val builder = KafkaSource.builder() .setBootstrapServers(params.get("bootstrapServers")) .setGroupId(params.get("groupId")) .setStartingOffsets(OffsetsInitializer.earliest()) .setTopics("topic") .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) if (params.getBoolean("boundedSource", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() } {code} The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}. https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E was: The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be instantiated until method {{deserialize()}} is invoked in runtime, so in the job compiling stage when invoking {{getProducedType()}}, NPE will be thrown because of referencing the uninstantiated variable {{deserializer}}. A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) {code} when setting it up like this: {code} val kafkaSource = buildKafkaSource(params) val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka") private fun buildKafkaSource(params: ParameterTool): KafkaSource { val builder = KafkaSource.builder() .setBootstrapServers(params.get("bootstrapServers")) .setGroupId(params.get("groupId")) .setStartingOffsets(OffsetsInitializer.earliest()) .setTopics("topic") .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) if (params.getBoolean("boundedSource", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() } {code} The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}. https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components:
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated FLINK-21160: - Description: The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be instantiated until method {{deserialize()}} is invoked in runtime, so in the job compiling stage when invoking {{getProducedType()}}, NPE will be thrown because of referencing the uninstantiated variable {{deserializer}}. A user reported that the new {{KafkaSource}} fails with a {{NullPointerException}}: {code} Exception in thread "main" java.lang.NullPointerException at org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) {code} when setting it up like this: {code} val kafkaSource = buildKafkaSource(params) val datastream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka") private fun buildKafkaSource(params: ParameterTool): KafkaSource { val builder = KafkaSource.builder() .setBootstrapServers(params.get("bootstrapServers")) .setGroupId(params.get("groupId")) .setStartingOffsets(OffsetsInitializer.earliest()) .setTopics("topic") .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) if (params.getBoolean("boundedSource", false)) { builder.setBounded(OffsetsInitializer.latest()) } return builder.build() } {code} The problem seems to be that the {{ValueDeserializerWrapper}} does not set the deserializer the deserialize method is called, but {{getProducedType}} is actually called first resulting in the {{NullPointerException}}. https://lists.apache.org/x/thread.html/r8734f9a18c25fd5996fc2edf9889277c185ee9a0b79280938b1cb792@%3Cuser.flink.apache.org%3E was:The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be instantiated until method {{deserialize()}} is invoked in runtime, so in the job compiling stage when invoking {{getProducedType()}}, NPE will be thrown because of referencing the uninstantiated variable {{deserializer}}. > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. > A user reported that the new {{KafkaSource}} fails with a > {{NullPointerException}}: > {code} > Exception in thread "main" java.lang.NullPointerException > at > org.apache.flink.connector.kafka.source.reader.deserializer.ValueDeserializerWrapper.getProducedType(ValueDeserializerWrapper.java:79) > at > org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:171) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2282) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1744) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1715) > {code} > when setting it up like this: > {code} > val kafkaSource = buildKafkaSource(params) > val datastream = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "kafka") > private fun buildKafkaSource(params: ParameterTool): KafkaSource { > val builder = KafkaSource.builder() > .setBootstrapServers(params.get("bootstrapServers")) > .setGroupId(params.get("groupId")) > .setStartingOffsets(OffsetsInitializer.earliest()) > .setTopics("topic") > > .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer::class.java)) > if (params.getBoolean("boundedSource", false)) { > builder.setBounded(OffsetsInitializer.latest()) > } > return builder.build() > } > {code} >
[jira] [Updated] (FLINK-21160) ValueDeserializerWrapper throws NullPointerException when getProducedType is invoked
[ https://issues.apache.org/jira/browse/FLINK-21160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-21160: --- Labels: pull-request-available (was: ) > ValueDeserializerWrapper throws NullPointerException when getProducedType is > invoked > > > Key: FLINK-21160 > URL: https://issues.apache.org/jira/browse/FLINK-21160 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Qingsheng Ren >Priority: Major > Labels: pull-request-available > > The variable {{deserializer}} in class {{ValueDeserializerWrapper}} won't be > instantiated until method {{deserialize()}} is invoked in runtime, so in the > job compiling stage when invoking {{getProducedType()}}, NPE will be thrown > because of referencing the uninstantiated variable {{deserializer}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)