Sorry, there was a code issue, where I was creating a kafka 10 consumer. Problem solved.
From: Oliver Buckley-Salmon Sent: 07 September 2018 15:04 To: user@flink.apache.org Subject: Flink Failing to Connect to Kafka org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException Hi, I have a Flink 1.4.0 cluster running on OpenShift with a job that connects to a Kafka 0.11.0.1 cluster in the same Openshift project. The job reads from one topic and writes to two others. The job deploys OK but when it starts up it immediately crashes with the following exception org.apache.kafka.common.protocol.types.SchemaException: Error computing size for field 'topics': java.lang.NullPointerException at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:93) at org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:258) at org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28) at org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:81) at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:74) at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:396) at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:370) at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:332) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:409) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1386) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) The version of the Flink Kafka Connector I'm using is <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.4.0</version> </dependency> I can write and consume from the Kafka cluster and can see the brokers in Zookeeper, can anyone tell me what the exception means and what I can do to resolve it? Thanks very much in advance for your help. Kind regards, Oliver Buckley-Salmon --- This e-mail may contain confidential and/or privileged information. If you are not the intended recipient (or have received this e-mail in error) please notify the sender immediately and delete this e-mail. Any unauthorized copying, disclosure or distribution of the material in this e-mail is strictly forbidden. Please refer to https://www.db.com/disclosures for additional EU corporate and regulatory disclosures and to http://www.db.com/unitedkingdom/content/privacy.htm for information about privacy.