I modify code in KafkaProducer.java to catch the exception stack, and
finally get the real Exception cause.

java.lang.ClassCastException:
org.apache.storm.shade.org.json.simple.JSONObject cannot be cast to
org.json.simple.JSONObject


So this problem is related to the maven shade plugin.

2016-11-16 19:20 GMT+08:00 Zhechao Ma <mazhechaomaill...@gmail.com>:

> Hi Amber,
>
> Here is the code.
>
> Properties prop = new Properties();
> prop.put("bootstrap.servers", kafkaBrokers);
> prop.put("ack", ack);
> prop.put("key.serializer", keySerializer);
> prop.put("value.serializer", valueSerializer);
>
> KafkaBolt kafkaBolt = new KafkaBolt<String, JSONObject>()
>                 .withProducerProperties(prop)
>                 .withTopicSelector(new DefaultTopicSelector(outputTopic))
>                 .withTupleToKafkaMapper(new 
> FieldNameBasedTupleToKafkaMapper("conn","httpstream"));
>
> builder.setBolt("kafkabolt", kafkaBolt, 
> kafkaBoltParallelism).shuffleGrouping("normalizeLog", "origin");
>
>
>
>
> And I have found where the Exception throws, that's in *KafkaProduce.java.
> *That's a ClassCastException. I am confused about the class casting,
> because there seems no class casting. My own serializer was posted in my
> last mail.
>
> byte[] serializedValue;
>     try {
>         serializedValue = valueSerializer.serialize(record.topic(), 
> record.value());
>     } catch (ClassCastException cce) {
>       throw new SerializationException("Can't convert value of class " + 
> record.value().getClass().getName() +
>          " to class " + 
> producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
>  +
>          " specified in value.serializer");}
>
> Any suggestion?
>
> Thanks.
>
>
> 2016-11-16 12:53 GMT+08:00 Amber Kulkarni <amber.kulkarn...@gmail.com>:
>
>> Hey,
>>
>> You want to post json string to kafka right ?
>> Also can you post code you are using to post to kafka.
>>
>> On Tue, Nov 15, 2016 at 12:18 PM, Zhechao Ma
>> <mazhechaomaill...@gmail.com> wrote:
>> > Here is the code. I can only get  log in the constructor and configure
>> > method.
>> >
>> > import org.json.simple.JSONObject;
>> > import org.apache.kafka.common.errors.SerializationException;
>> > import org.apache.kafka.common.serialization.Serializer;
>> >
>> > import java.util.Map;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory
>> >
>> > public class JsonSerializer implements Serializer<JSONObject> {
>> >     private static final Logger LOG =
>> > LoggerFactory.getLogger(JsonSerializer.class);
>> >     /**
>> >      * Default constructor needed by Kafka
>> >      */
>> >     public JsonSerializer() {
>> >         LOG.info("===> JsonSerializer constructor !!");
>> >     }
>> >
>> >     @Override
>> >     public void configure(Map<String, ?> config, boolean isKey) {
>> >         LOG.info("===> JsonSerializer configure");
>> >     }
>> >
>> >     @Override
>> >     public byte[] serialize(String topic, JSONObject data) {
>> >         LOG.info("===> JsonSerializer serialize !!");
>> >         if (data == null)
>> >             return null;
>> >         try {
>> >             return data.toString().getBytes("utf-8");
>> >         } catch (Exception e) {
>> >             LOG.error("===> JsonSerializer serialize EXCEPTION");
>> >             throw new SerializationException("Error serializing JSON
>> > message", e);
>> >         }
>> >     }
>> >
>> >     @Override
>> >     public void close() {
>> >         LOG.error("===> JsonSerializer close");
>> >     }
>> > }
>> >
>> >
>> >
>> >
>> > 2016-11-14 20:01 GMT+08:00 Andrew Xor <andreas.gramme...@gmail.com>:
>> >>
>> >> Hi,
>> >>
>> >>  Since you can't cast one type to  another and you are not getting a
>> Null
>> >> exception in order to be better able to help you could you give us the
>> >> implementation of your serializer?
>> >>
>> >> Cheers,
>> >>
>> >> A.
>> >>
>> >> On Mon, Nov 14, 2016 at 9:17 AM, Zhechao Ma <
>> mazhechaomaill...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Even when I implement my own json serializer, it still throws the
>> similar
>> >>> exception, but no more details for debug.:
>> >>>
>> >>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> >>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> xxxxxx
>> >>> specified in value.serializer
>> >>>
>> >>> I add a LOG in the overriding method byte[] serialize(String topic,
>> >>> JSONObject data), and found no logs in worker.log. That is to say this
>> >>> exception is throwed before method serialize is called.
>> >>>
>> >>> 2016-11-07 16:50 GMT+08:00 Zhechao Ma <mazhechaomaill...@gmail.com>:
>> >>>>
>> >>>> I'm using KafkaBolt to write data to kafka. Tuple to kafka map is
>> >>>> <String, JSONObject>.
>> >>>> I set both key.serializer and value.serializer as
>> >>>> "org.apache.kafka.common.serialization.StringSerializer". I get the
>> >>>> following Exception:
>> >>>>
>> >>>> org.apache.kafka.common.errors.SerializationException: Can't convert
>> >>>> value of class org.apache.storm.shade.org.json.simple.JSONObject to
>> class
>> >>>> org.apache.kafka.common.serialization.StringSerializer specified in
>> >>>> value.serializer
>> >>>>
>> >>>>
>> >>>> I cannot find other serializers related to JSON, and I'm using storm
>> >>>> 1.0.2 and kafka 0.8.1.1.
>> >>>>
>> >>>> Could anyone help ?
>> >>>>
>> >>>> --
>> >>>> Thanks
>> >>>> Zhechao Ma
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thanks
>> >>> Zhechao Ma
>> >>
>> >>
>> >
>> >
>> >
>> > --
>> > Thanks
>> > Zhechao Ma
>>
>>
>>
>> --
>> Regards,
>> Amber Kulkarni
>>
>
>
>
> --
> Thanks
> Zhechao Ma
>



-- 
Thanks
Zhechao Ma

Reply via email to