I figured my mistake: the props.put("serializer.class", "kafka.serializer.DefaultEncoder");
was set to StringEncoder instead of the DefaultEncoder which takes a byte[]However the key also needs to be a byte[] and i had to change that too. From: ele...@msn.com To: user@storm.apache.org Subject: Kafka Bolt - pushing byte array Date: Thu, 19 Mar 2015 19:58:23 +0000 Hello I'm reading data from Kafka formatted as a Protobuf object (it comes out as a byte[] ) This works fine and I can read / decode the data, but trying to push back to the queue, when declaring the Kafka Bolt without any type specifics, it seems to require a String object that then gets encoded.KafkaBolt kafkaBolt = new KafkaBolt() .withTopicSelector(new DefaultTopicSelector("topic")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); If I try to apply typing to the constructor like this:KafkaBolt kafkaBolt = new KafkaBolt<String, byte[]>() .withTopicSelector(new DefaultTopicSelector("topic")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, byte[]>());It still seems to expect a String to decode.I tried to encode as a string in the previous bolt with:byte[] byteArray = input.getBinary(0); String output; try { output = new String(byteArray, "ISO-8859-1"); _collector.emit(new Values(this.topic, output, "key")); } catch (...){}and now the KafkaBolt is able to send to the queue, but it fails on some objects, and the ProtoBuf decoder o nthe other side spits out lots of errors.I used ISO-8859-1 as I read it is a 1-1 mapping from binary, but I have no idea what encoding is used by the KafkaBolt on the other side and it certainly can be the problem. Ideally I want to avoid the String encoding / decoding, so how do I specify the type of the 'message' to the KafkaBolt? What am I doing wrong?Thanks for help. Regards