HI Michael;
Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
in my 'targetTopic'.
But still I get my class cast exception issue, when I consume message from
the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)

*ConsumerRecords<?, ?> records = consumer.poll(Long.MAX_VALUE);*


*java.lang.ClassCastException: java.lang.String cannot be cast to
xxx.core.kafkamodels.KafkaPayload at



at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]


at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]



at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

On 12 October 2016 at 13:19, Ratha v <vijayara...@gmail.com> wrote:

> HI Michael;
> Really appreciate for the clear explanation..
> I modified my code as you mentioned. I have written custom, Serde,
> serializer,deserializer.
> But now the problem i see is, both topics are not merged. Means, Messages
> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
> messages)
> I do not see any exceptions.
> Here is my custom serde, serializer/deserializer and the logic; Also I
> have properties file where i defined  following parameters;
> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com
> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092*
> *key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde*
> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
> *application.id <http://application.id>=stream-pipe*
> Do you see any issue here? Why messages are not written to ' targetTopic'?
> /**
> * create stream from source topics and write it to the target topic
> * @param sourceTopics
> * @param targetTopic
> */
> public void write(String[] sourceTopics, String targetTopic) {
>      KafkaStreams streams = null;
>      KStreamBuilder builder = new KStreamBuilder();
>   try {
>            KStream<String, KafkaPayload> kafkaPayloadStream = builder
> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>            kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
> targetTopic);
>            streams = new KafkaStreams(builder, properties);
>            streams.start();
>            Thread.sleep(5000L);
>       } catch (InterruptedException e) {
>           log.warn(e);
>      } catch (Exception e) {
>          log.error("Topic merge failed. ",e);
>       } finally {
>            if (streams != null) {
>            streams.close();
>      }
> }
> }
> public class KafkaPayloadSerdes {
> static private class WrapperSerde<KafkaPayload> implements
> Serde<KafkaPayload> {
> final private Serializer<KafkaPayload> serializer;
> final private Deserializer<KafkaPayload> deserializer;
> public WrapperSerde(Serializer<KafkaPayload> serializer,
> Deserializer<KafkaPayload> deserializer) {
> this.serializer = serializer;
> this.deserializer = deserializer;
> }
> @Override
> public void configure(Map<String, ?> configs, boolean isKey) {
> serializer.configure(configs, isKey);
> deserializer.configure(configs, isKey);
> }
> @Override
> public void close() {
> serializer.close();
> deserializer.close();
> }
> @Override
> public Serializer<KafkaPayload> serializer() {
> return serializer;
> }
> @Override
> public Deserializer<KafkaPayload> deserializer() {
> return deserializer;
> }
> }
> static public final class KafkaPayloadSerde extends
> WrapperSerde<KafkaPayload> {
> public KafkaPayloadSerde() {
> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
> }
> }
> /**
> * A serde for nullable < KafkaPayload> type.
> */
> static public Serde<KafkaPayload> KafkaPayload() {
> return new KafkaPayloadSerde();
> }
> }
> *Serilizer/Deserializer*
> public class KafkaPayloadSerializer implements Serializer<KafkaPayload>,
> Deserializer<KafkaPayload> {
> private static final Logger log = org.apache.logging.log4j.LogManager
> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
> @Override
> public KafkaPayload deserialize(String topic, byte[] arg1) {
> ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
> ObjectInput in = null;
> Object obj = null;
> try {
> in = new ObjectInputStream(bis);
> obj = in.readObject();
> } catch (IOException e) {
> log.error(e);
> } catch (ClassNotFoundException e) {
> log.error(e);
> } finally {
> try {
> bis.close();
> if (in != null) {
> in.close();
> }
> } catch (IOException ex) {
> log.error(ex);
> }
> }
> return (KafkaPayload) obj;
> }
> @Override
> public void close() {
> // TODO Auto-generated method stub
> }
> @Override
> public byte[] serialize(String topic, KafkaPayload kpayload) {
> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> ObjectOutput out = null;
> byte[] payload = null;
> try {
> out = new ObjectOutputStream(bos);
> out.writeObject(kpayload);
> payload = bos.toByteArray();
> } catch (IOException e) {
> e.printStackTrace();
> } finally {
> try {
> if (out != null) {
> out.close();
> bos.close();
> }
> } catch (Exception ex) {
> log.error(ex);
> }
> }
> return payload;
> }
> @Override
> public void configure(Map configs, boolean isKey) {
> // TODO Auto-generated method stub
> }
> }
> On 11 October 2016 at 20:13, Michael Noll <mich...@confluent.io> wrote:
>> When I wrote:
>>     "If you haven't changed to default key and value serdes, then `to()`
>> will fail because [...]"
>> it should have read:
>>     "If you haven't changed the default key and value serdes, then `to()`
>> will fail because [...]"
>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <mich...@confluent.io>
>> wrote:
>> > Ratha,
>> >
>> > if you based your problematic code on the PipeDemo example, then you
>> > should have these two lines in your code (which most probably you
>> haven't
>> > changed):
>> >
>> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass());
>> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass());
>> >
>> > This configures your application to interpret (= encode/decode), by
>> > default, the keys and values of any messages it reads from Kafka as
>> > strings.  This works for the PipeDemo example because the keys and
>> values
>> > are actually strings.
>> >
>> > In your application, however, you do:
>> >
>> >    KStream<String, KafkaPayload> kafkaPayloadStream =
>> > builder.stream(sourceTopics);
>> >
>> > This won't work, because `builder.stream()`, when calling it without
>> > explicit serdes, will use the default serdes configured for your
>> > application.  So `builder.stream(sourceTopics)` will give you
>> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`.  Also,
>> you
>> > can't just cast a String to KafkaPayload to "fix" the problem;  if you
>> > attempt to do so you run into the ClassCastException that you reported
>> > below.
>> >
>> > What you need to do fix your problem is:
>> >
>> > 1. Provide a proper serde for `KafkaPayload`.  See
>> > http://docs.confluent.io/current/streams/developer-
>> > guide.html#implementing-custom-serializers-deserializers-serdes.  There
>> > are also example implementations of such custom serdes at [1] and [2].
>> >
>> > Once you have that, you can e.g. write:
>> >
>> >     final Serde<String> stringSerde = Serdes.String(); // provided by
>> Kafka
>> >     final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be
>> provided
>> > by you!
>> >
>> > 2.  Call `builder.stream()` with explicit serdes to overrides the
>> default
>> > serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the
>> values.
>> >
>> >     KStream<String, KafkaPayload> kafkaPayloadStream =
>> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>> >
>> > That should do it.
>> >
>> > Lastly, you must think about serialization also when calling `to()` or
>> > `through()`:
>> >
>> >     kafkaPayloadStream.to(targetTopic);
>> >
>> > If you haven't changed to default key and value serdes, then `to()` will
>> > fail because it will by default (in your app configuration) interpret
>> > message values still as strings rather than KafkaPayload.  To fix this
>> you
>> > should call:
>> >
>> >     kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
>> >
>> > You need to override the default serdes whenever the data must be
>> written
>> > with, well, non-default serdes.
>> >
>> > I'd recommend reading http://docs.confluent.io/curre
>> nt/streams/developer-
>> > guide.html#data-types-and-serialization to better understand how this
>> > works.
>> >
>> >
>> > Hope this helps,
>> > Michael
>> >
>> >
>> >
>> > [1] http://docs.confluent.io/current/streams/developer-
>> > guide.html#available-serializers-deserializers-serdes
>> > [2] https://github.com/confluentinc/examples/tree/
>> > kafka-
>> > confluent/examples/streams/utils
>> >
>> >
>> >
>> >
>> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> wrote:
>> >
>> >> I checked my target topic and I see few messages than the source topic.
>> >> (If
>> >> source topic have 5 messages, I see 2 messages in my target topic) What
>> >> settings I need to do ?
>> >>
>> >> And, when I try to consume message from the target topic, I get
>> ClassCast
>> >> Exception.
>> >>
>> >> java.lang.ClassCastException: java.lang.String cannot be cast to
>> >> xx.yy.core.kafkamodels.KafkaPayload;
>> >>
>> >> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>> >>
>> >>
>> >> I Merge two topics like;
>> >>
>> >> * KStreamBuilder builder = new KStreamBuilder();*
>> >>
>> >> * KStream<String, KafkaPayload> kafkaPayloadStream =
>> >> builder.stream(sourceTopics);*
>> >>
>> >> * kafkaPayloadStream.to(targetTopic);*
>> >>
>> >> * streams = new KafkaStreams(builder, properties);*
>> >>
>> >> * streams.start();*
>> >>
>> >>
>> >> Why do I see classcast exception when consuming the message?
>> >>
>> >>
>> >> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote:
>> >>
>> >> > Hi all;
>> >> > I have custom datatype defined (a pojo class).
>> >> > I copy  messages from one topic to another topic.
>> >> > I do not see any messages in my target topic.
>> >> > This works fro string messages, but not for my custom message.
>> >> > Waht might be the cause?
>> >> > I followed this sample [1]
>> >> > [1]
>> >> > https://github.com/apache/kafka/blob/trunk/streams/
>> >> > examples/src/main/java/org/apache/kafka/streams/examples/
>> >> > pipe/PipeDemo.java
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >>
>> >
>> >
>> >
