HI Michael; Sorry , after setting "auto.offset.reset" to 'earliest' , I see messages in my 'targetTopic'. But still I get my class cast exception issue, when I consume message from the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)
*ConsumerRecords<?, ?> records = consumer.poll(Long.MAX_VALUE);* *Exception* *java.lang.ClassCastException: java.lang.String cannot be cast to xxx.core.kafkamodels.KafkaPayload at xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108) ~[classes/:?]* at xx.core.listener.KafkaMessageListenerThread.process(KafkaMessageListenerThread.java:68) ~[classes/:?] at xx.core.listener.KafkaMessageListenerThread.lambda$run$1(KafkaMessageListenerThread.java:50) ~[classes/:?] at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66] at com.leightonobrien.core.listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:50) [classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_66] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_66] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_66] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66] On 12 October 2016 at 13:19, Ratha v <vijayara...@gmail.com> wrote: > HI Michael; > > Really appreciate for the clear explanation.. > I modified my code as you mentioned. I have written custom, Serde, > serializer,deserializer. > But now the problem i see is, both topics are not merged. Means, Messages > in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0' > messages) > I do not see any exceptions. > > Here is my custom serde, serializer/deserializer and the logic; Also I > have properties file where i defined following parameters; > > *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com > <http://xx.com>\:9092,xx.com <http://xx.com>\:9092* > > *key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde* > > *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde* > > *application.id <http://application.id>=stream-pipe* > > > Do you see any issue here? Why messages are not written to ' targetTopic'? > > > > *LOGIC* > > /** > > * create stream from source topics and write it to the target topic > > * @param sourceTopics > > * @param targetTopic > > */ > > public void write(String[] sourceTopics, String targetTopic) { > > KafkaStreams streams = null; > > KStreamBuilder builder = new KStreamBuilder(); > > try { > > KStream<String, KafkaPayload> kafkaPayloadStream = builder > .stream(stringSerde, kafkaPayloadSerde, sourceTopics); > > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, > targetTopic); > > streams = new KafkaStreams(builder, properties); > > streams.start(); > > Thread.sleep(5000L); > > } catch (InterruptedException e) { > > log.warn(e); > > } catch (Exception e) { > > log.error("Topic merge failed. ",e); > > } finally { > > if (streams != null) { > > streams.close(); > > } > > } > > } > > > > > *SERDE* > > > public class KafkaPayloadSerdes { > > static private class WrapperSerde<KafkaPayload> implements > Serde<KafkaPayload> { > final private Serializer<KafkaPayload> serializer; > final private Deserializer<KafkaPayload> deserializer; > > public WrapperSerde(Serializer<KafkaPayload> serializer, > Deserializer<KafkaPayload> deserializer) { > this.serializer = serializer; > this.deserializer = deserializer; > } > > @Override > public void configure(Map<String, ?> configs, boolean isKey) { > serializer.configure(configs, isKey); > deserializer.configure(configs, isKey); > } > > @Override > public void close() { > serializer.close(); > deserializer.close(); > } > > @Override > public Serializer<KafkaPayload> serializer() { > return serializer; > } > > @Override > public Deserializer<KafkaPayload> deserializer() { > return deserializer; > } > } > > static public final class KafkaPayloadSerde extends > WrapperSerde<KafkaPayload> { > public KafkaPayloadSerde() { > super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer()); > } > } > > /** > * A serde for nullable < KafkaPayload> type. > */ > static public Serde<KafkaPayload> KafkaPayload() { > return new KafkaPayloadSerde(); > } > > } > > > *Serilizer/Deserializer* > > > > public class KafkaPayloadSerializer implements Serializer<KafkaPayload>, > Deserializer<KafkaPayload> { > > private static final Logger log = org.apache.logging.log4j.LogManager > .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName()); > > @Override > public KafkaPayload deserialize(String topic, byte[] arg1) { > ByteArrayInputStream bis = new ByteArrayInputStream(arg1); > ObjectInput in = null; > Object obj = null; > try { > in = new ObjectInputStream(bis); > obj = in.readObject(); > } catch (IOException e) { > log.error(e); > } catch (ClassNotFoundException e) { > log.error(e); > } finally { > try { > bis.close(); > if (in != null) { > in.close(); > } > } catch (IOException ex) { > log.error(ex); > } > } > return (KafkaPayload) obj; > } > > @Override > public void close() { > // TODO Auto-generated method stub > > } > > @Override > public byte[] serialize(String topic, KafkaPayload kpayload) { > ByteArrayOutputStream bos = new ByteArrayOutputStream(); > ObjectOutput out = null; > byte[] payload = null; > try { > out = new ObjectOutputStream(bos); > out.writeObject(kpayload); > payload = bos.toByteArray(); > > } catch (IOException e) { > e.printStackTrace(); > } finally { > try { > if (out != null) { > out.close(); > bos.close(); > } > } catch (Exception ex) { > log.error(ex); > } > } > return payload; > } > > @Override > public void configure(Map configs, boolean isKey) { > // TODO Auto-generated method stub > > } > > } > > > > On 11 October 2016 at 20:13, Michael Noll <mich...@confluent.io> wrote: > >> When I wrote: >> >> "If you haven't changed to default key and value serdes, then `to()` >> will fail because [...]" >> >> it should have read: >> >> "If you haven't changed the default key and value serdes, then `to()` >> will fail because [...]" >> >> >> >> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <mich...@confluent.io> >> wrote: >> >> > Ratha, >> > >> > if you based your problematic code on the PipeDemo example, then you >> > should have these two lines in your code (which most probably you >> haven't >> > changed): >> > >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > Serdes.String().getClass()); >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > Serdes.String().getClass()); >> > >> > This configures your application to interpret (= encode/decode), by >> > default, the keys and values of any messages it reads from Kafka as >> > strings. This works for the PipeDemo example because the keys and >> values >> > are actually strings. >> > >> > In your application, however, you do: >> > >> > KStream<String, KafkaPayload> kafkaPayloadStream = >> > builder.stream(sourceTopics); >> > >> > This won't work, because `builder.stream()`, when calling it without >> > explicit serdes, will use the default serdes configured for your >> > application. So `builder.stream(sourceTopics)` will give you >> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`. Also, >> you >> > can't just cast a String to KafkaPayload to "fix" the problem; if you >> > attempt to do so you run into the ClassCastException that you reported >> > below. >> > >> > What you need to do fix your problem is: >> > >> > 1. Provide a proper serde for `KafkaPayload`. See >> > http://docs.confluent.io/current/streams/developer- >> > guide.html#implementing-custom-serializers-deserializers-serdes. There >> > are also example implementations of such custom serdes at [1] and [2]. >> > >> > Once you have that, you can e.g. write: >> > >> > final Serde<String> stringSerde = Serdes.String(); // provided by >> Kafka >> > final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be >> provided >> > by you! >> > >> > 2. Call `builder.stream()` with explicit serdes to overrides the >> default >> > serdes. stringSerde is for the keys, kafkaPayloadSerde is for the >> values. >> > >> > KStream<String, KafkaPayload> kafkaPayloadStream = >> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics); >> > >> > That should do it. >> > >> > Lastly, you must think about serialization also when calling `to()` or >> > `through()`: >> > >> > kafkaPayloadStream.to(targetTopic); >> > >> > If you haven't changed to default key and value serdes, then `to()` will >> > fail because it will by default (in your app configuration) interpret >> > message values still as strings rather than KafkaPayload. To fix this >> you >> > should call: >> > >> > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic); >> > >> > You need to override the default serdes whenever the data must be >> written >> > with, well, non-default serdes. >> > >> > I'd recommend reading http://docs.confluent.io/curre >> nt/streams/developer- >> > guide.html#data-types-and-serialization to better understand how this >> > works. >> > >> > >> > Hope this helps, >> > Michael >> > >> > >> > >> > [1] http://docs.confluent.io/current/streams/developer- >> > guide.html#available-serializers-deserializers-serdes >> > [2] https://github.com/confluentinc/examples/tree/ >> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/ >> > confluent/examples/streams/utils >> > >> > >> > >> > >> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> wrote: >> > >> >> I checked my target topic and I see few messages than the source topic. >> >> (If >> >> source topic have 5 messages, I see 2 messages in my target topic) What >> >> settings I need to do ? >> >> >> >> And, when I try to consume message from the target topic, I get >> ClassCast >> >> Exception. >> >> >> >> java.lang.ClassCastException: java.lang.String cannot be cast to >> >> xx.yy.core.kafkamodels.KafkaPayload; >> >> >> >> * receivedPayload = (KafkaPayload) consumerRecord.value();* >> >> >> >> >> >> I Merge two topics like; >> >> >> >> * KStreamBuilder builder = new KStreamBuilder();* >> >> >> >> * KStream<String, KafkaPayload> kafkaPayloadStream = >> >> builder.stream(sourceTopics);* >> >> >> >> * kafkaPayloadStream.to(targetTopic);* >> >> >> >> * streams = new KafkaStreams(builder, properties);* >> >> >> >> * streams.start();* >> >> >> >> >> >> Why do I see classcast exception when consuming the message? >> >> >> >> >> >> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote: >> >> >> >> > Hi all; >> >> > I have custom datatype defined (a pojo class). >> >> > I copy messages from one topic to another topic. >> >> > I do not see any messages in my target topic. >> >> > This works fro string messages, but not for my custom message. >> >> > Waht might be the cause? >> >> > I followed this sample [1] >> >> > [1] >> >> > https://github.com/apache/kafka/blob/trunk/streams/ >> >> > examples/src/main/java/org/apache/kafka/streams/examples/ >> >> > pipe/PipeDemo.java >> >> > >> >> > >> >> > -- >> >> > -Ratha >> >> > http://vvratha.blogspot.com/ >> >> > >> >> >> >> >> >> >> >> -- >> >> -Ratha >> >> http://vvratha.blogspot.com/ >> >> >> > >> > >> > >> > > > > -- > -Ratha > http://vvratha.blogspot.com/ > -- -Ratha http://vvratha.blogspot.com/