I modify code in KafkaProducer.java to catch the exception stack, and finally get the real Exception cause.
java.lang.ClassCastException: org.apache.storm.shade.org.json.simple.JSONObject cannot be cast to org.json.simple.JSONObject So this problem is related to the maven shade plugin. 2016-11-16 19:20 GMT+08:00 Zhechao Ma <mazhechaomaill...@gmail.com>: > Hi Amber, > > Here is the code. > > Properties prop = new Properties(); > prop.put("bootstrap.servers", kafkaBrokers); > prop.put("ack", ack); > prop.put("key.serializer", keySerializer); > prop.put("value.serializer", valueSerializer); > > KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>() > .withProducerProperties(prop) > .withTopicSelector(new DefaultTopicSelector(outputTopic)) > .withTupleToKafkaMapper(new > FieldNameBasedTupleToKafkaMapper("conn","httpstream")); > > builder.setBolt("kafkabolt", kafkaBolt, > kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin"); > > > > > And I have found where the Exception throws, that's in *KafkaProduce.java. > *That's a ClassCastException. I am confused about the class casting, > because there seems no class casting. My own serializer was posted in my > last mail. > > byte[] serializedValue; > try { > serializedValue = valueSerializer.serialize(record.topic(), > record.value()); > } catch (ClassCastException cce) { > throw new SerializationException("Can't convert value of class " + > record.value().getClass().getName() + > " to class " + > producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() > + > " specified in value.serializer");} > > Any suggestion? > > Thanks. > > > 2016-11-16 12:53 GMT+08:00 Amber Kulkarni <amber.kulkarn...@gmail.com>: > >> Hey, >> >> You want to post json string to kafka right ? >> Also can you post code you are using to post to kafka. >> >> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma >> <mazhechaomaill...@gmail.com> wrote: >> > Here is the code. I can only get log in the constructor and configure >> > method. >> > >> > import org.json.simple.JSONObject; >> > import org.apache.kafka.common.errors.SerializationException; >> > import org.apache.kafka.common.serialization.Serializer; >> > >> > import java.util.Map; >> > import org.slf4j.Logger; >> > import org.slf4j.LoggerFactory >> > >> > public class JsonSerializer implements Serializer<JSONObject> { >> > private static final Logger LOG = >> > LoggerFactory.getLogger(JsonSerializer.class); >> > /** >> > * Default constructor needed by Kafka >> > */ >> > public JsonSerializer() { >> > LOG.info("===> JsonSerializer constructor !!"); >> > } >> > >> > @Override >> > public void configure(Map<String, ?> config, boolean isKey) { >> > LOG.info("===> JsonSerializer configure"); >> > } >> > >> > @Override >> > public byte[] serialize(String topic, JSONObject data) { >> > LOG.info("===> JsonSerializer serialize !!"); >> > if (data == null) >> > return null; >> > try { >> > return data.toString().getBytes("utf-8"); >> > } catch (Exception e) { >> > LOG.error("===> JsonSerializer serialize EXCEPTION"); >> > throw new SerializationException("Error serializing JSON >> > message", e); >> > } >> > } >> > >> > @Override >> > public void close() { >> > LOG.error("===> JsonSerializer close"); >> > } >> > } >> > >> > >> > >> > >> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <andreas.gramme...@gmail.com>: >> >> >> >> Hi, >> >> >> >> Since you can't cast one type to another and you are not getting a >> Null >> >> exception in order to be better able to help you could you give us the >> >> implementation of your serializer? >> >> >> >> Cheers, >> >> >> >> A. >> >> >> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma < >> mazhechaomaill...@gmail.com> >> >> wrote: >> >>> >> >>> Even when I implement my own json serializer, it still throws the >> similar >> >>> exception, but no more details for debug.: >> >>> >> >>> org.apache.kafka.common.errors.SerializationException: Can't convert >> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to >> xxxxxx >> >>> specified in value.serializer >> >>> >> >>> I add a LOG in the overriding method byte[] serialize(String topic, >> >>> JSONObject data), and found no logs in worker.log. That is to say this >> >>> exception is throwed before method serialize is called. >> >>> >> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <mazhechaomaill...@gmail.com>: >> >>>> >> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is >> >>>> <String, JSONObject>. >> >>>> I set both key.serializer and value.serializer as >> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the >> >>>> following Exception: >> >>>> >> >>>> org.apache.kafka.common.errors.SerializationException: Can't convert >> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to >> class >> >>>> org.apache.kafka.common.serialization.StringSerializer specified in >> >>>> value.serializer >> >>>> >> >>>> >> >>>> I cannot find other serializers related to JSON, and I'm using storm >> >>>> 1.0.2 and kafka 0.8.1.1. >> >>>> >> >>>> Could anyone help ? >> >>>> >> >>>> -- >> >>>> Thanks >> >>>> Zhechao Ma >> >>> >> >>> >> >>> >> >>> >> >>> -- >> >>> Thanks >> >>> Zhechao Ma >> >> >> >> >> > >> > >> > >> > -- >> > Thanks >> > Zhechao Ma >> >> >> >> -- >> Regards, >> Amber Kulkarni >> > > > > -- > Thanks > Zhechao Ma > -- Thanks Zhechao Ma