[ https://issues.apache.org/jira/browse/HADOOP-14623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095937#comment-16095937 ]
Hongyuan Li edited comment on HADOOP-14623 at 7/21/17 8:54 AM: --------------------------------------------------------------- Hi, [~bharatviswa], find that the kafka client must have the same version of kafka server, or the new producer api will not functions well.The old {{kafka.javaapi.producer}} Producer functions well, but it will be removed from the future kafka version. The following is the stack trace when use kafka client 0.10.0 to write into kaffa server 0.9.0. {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1702065152, only 29 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) at java.lang.Thread.run(Thread.java:745) {code} Any good idea? *Update* use kafka client 0.9.x can write to kafka server whose version is 0.9.x-0.10.x. *Update* seems that the key type can be set to byte[], instead of using Integer. was (Author: hongyuan li): Hi, [~bharatviswa], find that the kafka client must have the same version of kafka server, or the new producer api will not functions well.The old {{kafka.javaapi.producer}} Producer functions well, but it will be removed from the future kafka version. The following is the stack trace when use kafka client 0.10.0 to write into kaffa server 0.9.0. {code} org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 1702065152, only 29 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135) at java.lang.Thread.run(Thread.java:745) {code} Any good idea? *Update* use kafka client 0.9.x can write to kafka server whose version is 0.9.x-0.10.x. > fixed some bugs in KafkaSink > ----------------------------- > > Key: HADOOP-14623 > URL: https://issues.apache.org/jira/browse/HADOOP-14623 > Project: Hadoop Common > Issue Type: Bug > Components: common, tools > Affects Versions: 3.0.0-alpha3 > Reporter: Hongyuan Li > Assignee: Hongyuan Li > Attachments: HADOOP-14623-001.patch, HADOOP-14623-002.patch > > > {{KafkaSink}}#{{init}} should set ack to *1* to make sure the message has > been written to the broker at least. > current code list below: > {code} > > props.put("request.required.acks", "0"); > {code} > *Update* > find another bug about this class, {{key.serializer}} used > {{org.apache.kafka.common.serialization.ByteArraySerializer}}, however, the > key properties of Producer is Integer, codes list below: > {code} > props.put("key.serializer", > "org.apache.kafka.common.serialization.ByteArraySerializer"); > …………… > producer = new KafkaProducer<Integer, byte[]>(props); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org