Sorry, there was a code issue, where I was creating a kafka 10 consumer.
Problem solved.
From: Oliver Buckley-Salmon
Sent: 07 September 2018 15:04
To: user@flink.apache.org
Subject: Flink Failing to Connect to Kafka
org.apache.kafka.common.protocol.types.SchemaException: Error computing size
for field 'topics': java.lang.NullPointerException
Hi,
I have a Flink 1.4.0 cluster running on OpenShift with a job that connects to a
Kafka 0.11.0.1 cluster in the same Openshift project. The job reads from one
topic and writes to two others.
The job deploys OK but when it starts up it immediately crashes with the
following exception
org.apache.kafka.common.protocol.types.SchemaException: Error computing size
for field 'topics': java.lang.NullPointerException
at org.apache.kafka.common.protocol.types.Schema.sizeOf(Schema.java:93)
at
org.apache.kafka.common.protocol.types.Struct.sizeOf(Struct.java:258)
at
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:28)
at
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:81)
at
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:74)
at
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:396)
at
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:370)
at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:332)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:409)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
at
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314)
at
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1386)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:75)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:128)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:415)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
The version of the Flink Kafka Connector I'm using is
org.apache.flink
flink-connector-kafka-0.11_2.11
1.4.0
I can write and consume from the Kafka cluster and can see the brokers in
Zookeeper, can anyone tell me what the exception means and what I can do to
resolve it?
Thanks very much in advance for your help.
Kind regards,
Oliver Buckley-Salmon
---
This e-mail may contain confidential and/or privileged information. If you are
not the intended recipient (or have received this e-mail in error) please
notify the sender immediately and delete this e-mail. Any unauthorized copying,
disclosure or distribution of the material in this e-mail is strictly forbidden.
Please refer to https://www.db.com/disclosures for additional EU corporate and
regulatory disclosures and to
http://www.db.com/unitedkingdom/content/privacy.htm for information about
privacy.