[ https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-9627: ---------------------------------- Labels: pull-request-available (was: ) > Extending 'KafkaJsonTableSource' according to comments will result in NPE > ------------------------------------------------------------------------- > > Key: FLINK-9627 > URL: https://issues.apache.org/jira/browse/FLINK-9627 > Project: Flink > Issue Type: Bug > Affects Versions: 1.5.0 > Reporter: Dominik Wosiński > Assignee: vinoyang > Priority: Major > Labels: pull-request-available > > According to the comments what is needed to extend the 'KafkaJsonTableSource' > looks as follows: > > {code:java} > A version-agnostic Kafka JSON {@link StreamTableSource}. > * > * <p>The version-specific Kafka consumers need to extend this class and > * override {@link #createKafkaConsumer(String, Properties, > DeserializationSchema)}}. > * > * <p>The field names are used to parse the JSON file and so are the > types.{code} > This will cause an NPE, since there is no default value for startupMode in > the abstract class itself only in the builder of this class. > For the 'getKafkaConsumer' method the switch statement will be executed on > non-initialized 'startupMode' field: > {code:java} > switch (startupMode) { > case EARLIEST: > kafkaConsumer.setStartFromEarliest(); > break; > case LATEST: > kafkaConsumer.setStartFromLatest(); > break; > case GROUP_OFFSETS: > kafkaConsumer.setStartFromGroupOffsets(); > break; > case SPECIFIC_OFFSETS: > kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); > break; > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)