Re: Need suggestion on sending XML files via kafka

2017-02-13 Thread Ratha v
This Sample program may help you?
http://vvratha.blogspot.com.au/2016/07/sample-kafka-producer-and-consumer.html

On 14 February 2017 at 06:36, Prashanth Venkatesan <
prashanth.181...@gmail.com> wrote:

> Hi Team,
>
> I just started using Kafka. I have a usecase to send XML file or Document
> object via Kafka topic using Java. Can you enlight me with the guidance
> steps to achieve it??
>
> Please apologize and ignore if I am posting to inappropriate mail address.
>
> Thanks
> Prashanth
> +91-9677103475
> India
>



-- 
-Ratha
http://vvratha.blogspot.com/


{Kafka V 0.10.1}Producing message takes approximately 1 minute if any error occurs at async producer

2016-10-26 Thread Ratha v
Hi;
Im using asynchronous producer to send messages to the broker. If there is
no any error in my callback, i see it takes approximately 0.3 seconds to
produce message. But when I get below error [1], I see it takes 60 seconds
to produce message.
But I do not see any message loss . All messages are available in the
broker.
What causes this error? I see this delay in every 50 message I produced.
How can I increase the performance of the producer when I get this error?

*Code;*

producer.send(new ProducerRecord(topic, this), new ProducerCallback ());


  private class ProducerCallback implements Callback {

@Override

public void onCompletion(RecordMetadata recordMetadata, Exception ex)
{

if (ex != null) {

  log.error("Error when publishing messages to the topic. Topic
:"+ recordMetadata.topic(),ex);

}

}

}

*Producer properties*

*acks=1*

*linger.ms =10*

*batch.size=51200*

*bootstrap.servers=aukk1.xx.com \:9092,aukk2.xx.com
\:9092,aukk3.xx.com \:9092*

*key.serializer=org.apache.kafka.common.serialization.StringSerializer*

*value.serializer=com.xx.KafkaPayloadSerializer*

[1]


04:51:20,025 ERROR
[org.apache.kafka.clients.producer.internals.RecordBatch]
(kafka-producer-network-thread | producer-673) Error executing
user-provided callback on message for topic-partition RAW_XML1harveyzhu-1::
java.lang.NullPointerException
   at
com.lxx.kafkamodels.KafkaPayload$ProducerCallback.onCompletion(KafkaPayload.java:204)
   at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
   at
org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
   at
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
   at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:205)
   at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
   at java.lang.Thread.run(Thread.java:745)


Thanks.
-- 
-Ratha
http://vvratha.blogspot.com/


Re: kafka-topics.sh ---delete --topic ''x'' is not working for kafka V 0.10.1

2016-10-13 Thread Ratha v
I overcame the issue.

Issue was I had "/chroot" at the end.That is how mentioned in the
documentationkafka.apache.org/documentation.html#basic_ops_modify_topic.

Thank you.

On 14 October 2016 at 12:25, Ratha v <vijayara...@gmail.com> wrote:

> HI all;
>
> I try to delete a existing topic (I checked using kafka management
> console) using following command;
>
> #./kafka-topics.sh --zookeeper zookeeper.xx.com:2181/chroot --delete
> --topic testTopic
>
> But it says topic not available in zookeeper.[1]
> I create those topics at runtime . (I think it is created in kafka
> clusters? )
>
> How can I delete the topic using this bash script?
>
> [1]Error while executing topic command : Topic targettopic does not exist
> on ZK path zookeeper.xx.com:2181/chroot
>
> *[2016-10-14 11:58:59,919] ERROR java.lang.IllegalArgumentException: Topic
> streamtargettopic does not exist on ZK path zookeeper.xx.com:2181/chroot
> <http://zookeeper.xx.com:2181/chroot>*
>
> *at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:169)*
>
> *at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)*
>
> *at kafka.admin.TopicCommand.main(TopicCommand.scala)*
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


kafka-topics.sh ---delete --topic ''x'' is not working for kafka V 0.10.1

2016-10-13 Thread Ratha v
HI all;

I try to delete a existing topic (I checked using kafka management console)
using following command;

#./kafka-topics.sh --zookeeper zookeeper.xx.com:2181/chroot --delete
--topic testTopic

But it says topic not available in zookeeper.[1]
I create those topics at runtime . (I think it is created in kafka
clusters? )

How can I delete the topic using this bash script?

[1]Error while executing topic command : Topic targettopic does not exist
on ZK path zookeeper.xx.com:2181/chroot

*[2016-10-14 11:58:59,919] ERROR java.lang.IllegalArgumentException: Topic
streamtargettopic does not exist on ZK path zookeeper.xx.com:2181/chroot
*

*at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:169)*

*at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)*

*at kafka.admin.TopicCommand.main(TopicCommand.scala)*

-- 
-Ratha
http://vvratha.blogspot.com/


Re: How can I delete a topic programatically?

2016-10-13 Thread Ratha v
Hi Jianbin;
I tried like this; Where I provided my zookeeper host. But it says[1] . I
use kafka 0.10. And I see my topic available using KafkaManagementTool.

How can I overcome this issue?

[1]Error while executing topic command : Topic targettopic does not exist
on ZK path zookeeper.xx.com:2181/chroot

*[2016-10-14 11:58:59,919] ERROR java.lang.IllegalArgumentException: Topic
streamtargettopic does not exist on ZK path zookeeper.xx.com:2181/chroot
<http://zookeeper.xx.com:2181/chroot>*

* at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:169)*

* at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)*

* at kafka.admin.TopicCommand.main(TopicCommand.scala)*

 bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic
my_topic_name


On 12 October 2016 at 17:02, Ratha v <vijayara...@gmail.com> wrote:

> Thank you..
>
> On 12 October 2016 at 16:30, Jianbin Wei <jianbin@netskope.com> wrote:
>
>> You can check this
>>
>> http://kafka.apache.org/documentation.html#basic_ops_add_topic
>>
>> But from our experience it is best to delete topics one by one, i.e.,
>> make sure Kafka is in good shape before and after deleting a topic before
>> working on next one.
>>
>> Regards,
>>
>> -- Jianbin
>>
>> > On Oct 11, 2016, at 9:26 PM, Ratha v <vijayara...@gmail.com> wrote:
>> >
>> > Thanks..Which bash script I need to run?
>> >
>> >> On 12 October 2016 at 15:20, Ali Akhtar <ali.rac...@gmail.com> wrote:
>> >>
>> >> The last time I tried, I couldn't find a way to do it, other than to
>> >> trigger the bash script for topic deletion programatically.
>> >>
>> >>> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v <vijayara...@gmail.com>
>> wrote:
>> >>>
>> >>> Hi all;
>> >>>
>> >>> I have two topics(source and target). I do some processing on the
>> message
>> >>> available in the source topic and i merge both topic.
>> >>> That is;
>> >>>
>> >>> builder.stream(sourceTopic).to(targetTopic)
>> >>>
>> >>> Once merged I no longer require the sourceTopic. I want to delete it.
>> >>>
>> >>> How can I do that programatically in java? I use highelevel  client
>> APIs,
>> >>> kafka v 0.10.0.1
>> >>>
>> >>>
>> >>> Thanks
>> >>> --
>> >>> -Ratha
>> >>> http://vvratha.blogspot.com/
>> >>>
>> >>
>> >
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How can I delete a topic programatically?

2016-10-12 Thread Ratha v
Thank you..

On 12 October 2016 at 16:30, Jianbin Wei <jianbin@netskope.com> wrote:

> You can check this
>
> http://kafka.apache.org/documentation.html#basic_ops_add_topic
>
> But from our experience it is best to delete topics one by one, i.e., make
> sure Kafka is in good shape before and after deleting a topic before
> working on next one.
>
> Regards,
>
> -- Jianbin
>
> > On Oct 11, 2016, at 9:26 PM, Ratha v <vijayara...@gmail.com> wrote:
> >
> > Thanks..Which bash script I need to run?
> >
> >> On 12 October 2016 at 15:20, Ali Akhtar <ali.rac...@gmail.com> wrote:
> >>
> >> The last time I tried, I couldn't find a way to do it, other than to
> >> trigger the bash script for topic deletion programatically.
> >>
> >>> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v <vijayara...@gmail.com>
> wrote:
> >>>
> >>> Hi all;
> >>>
> >>> I have two topics(source and target). I do some processing on the
> message
> >>> available in the source topic and i merge both topic.
> >>> That is;
> >>>
> >>> builder.stream(sourceTopic).to(targetTopic)
> >>>
> >>> Once merged I no longer require the sourceTopic. I want to delete it.
> >>>
> >>> How can I do that programatically in java? I use highelevel  client
> APIs,
> >>> kafka v 0.10.0.1
> >>>
> >>>
> >>> Thanks
> >>> --
> >>> -Ratha
> >>> http://vvratha.blogspot.com/
> >>>
> >>
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How can I delete a topic programatically?

2016-10-11 Thread Ratha v
Thanks..Which bash script I need to run?

On 12 October 2016 at 15:20, Ali Akhtar <ali.rac...@gmail.com> wrote:

> The last time I tried, I couldn't find a way to do it, other than to
> trigger the bash script for topic deletion programatically.
>
> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi all;
> >
> > I have two topics(source and target). I do some processing on the message
> > available in the source topic and i merge both topic.
> > That is;
> >
> > builder.stream(sourceTopic).to(targetTopic)
> >
> > Once merged I no longer require the sourceTopic. I want to delete it.
> >
> > How can I do that programatically in java? I use highelevel  client APIs,
> > kafka v 0.10.0.1
> >
> >
> > Thanks
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


How can I delete a topic programatically?

2016-10-11 Thread Ratha v
Hi all;

I have two topics(source and target). I do some processing on the message
available in the source topic and i merge both topic.
That is;

builder.stream(sourceTopic).to(targetTopic)

Once merged I no longer require the sourceTopic. I want to delete it.

How can I do that programatically in java? I use highelevel  client APIs,
kafka v 0.10.0.1


Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Ratha v
Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer'
property..
Now things are working fine..
Thanks a lot.

On 12 October 2016 at 14:10, Ratha v <vijayara...@gmail.com> wrote:

> HI Michael;
> Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
> in my 'targetTopic'.
> But still I get my class cast exception issue, when I consume message from
> the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)
>
> *ConsumerRecords records = consumer.poll(Long.MAX_VALUE);*
>
>
>
> *Exception*
>
> *java.lang.ClassCastException: java.lang.String cannot be cast to
> xxx.core.kafkamodels.KafkaPayload at
> xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108)
> ~[classes/:?]*
>
> at xx.core.listener.KafkaMessageListenerThread.process(
> KafkaMessageListenerThread.java:68) ~[classes/:?]
>
> at xx.core.listener.KafkaMessageListenerThread.lambda$run$1(
> KafkaMessageListenerThread.java:50) ~[classes/:?]
>
> at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]
>
> at com.leightonobrien.core.listener.KafkaMessageListenerThread.run(
> KafkaMessageListenerThread.java:50) [classes/:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_66]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]
>
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_66]
>
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_66]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]
>
>
>
> On 12 October 2016 at 13:19, Ratha v <vijayara...@gmail.com> wrote:
>
>> HI Michael;
>>
>> Really appreciate for the clear explanation..
>> I modified my code as you mentioned. I have written custom, Serde,
>> serializer,deserializer.
>> But now the problem i see is, both topics are not merged. Means, Messages
>> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
>> messages)
>> I do not see any exceptions.
>>
>> Here is my custom serde, serializer/deserializer and the logic; Also I
>> have properties file where i defined  following parameters;
>>
>> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com
>> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092*
>>
>> *key.serde=org.apache.kafka.com
>> <http://org.apache.kafka.com>mon.serialization.Serdes$StringSerde*
>>
>> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
>>
>> *application.id <http://application.id>=stream-pipe*
>>
>>
>> Do you see any issue here? Why messages are not written to ' targetTopic'?
>>
>>
>>
>> *LOGIC*
>>
>> /**
>>
>> * create stream from source topics and write it to the target topic
>>
>> * @param sourceTopics
>>
>> * @param targetTopic
>>
>> */
>>
>> public void write(String[] sourceTopics, String targetTopic) {
>>
>>  KafkaStreams streams = null;
>>
>>  KStreamBuilder builder = new KStreamBuilder();
>>
>>   try {
>>
>>KStream<String, KafkaPayload> kafkaPayloadStream = builder
>> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>>
>>kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
>> targetTopic);
>>
>>streams = new KafkaStreams(builder, properties);
>>
>>streams.start();
>>
>>Thread.sleep(5000L);
>>
>>   } catch (InterruptedException e) {
>>
>>   log.warn(e);
>>
>>  } catch (Exception e) {
>>
>>  log.error("Topic merge failed. ",e);
>>
>>   } finally {
>>
>>if (streams != null) {
>>
>>streams.close();
>>
>>  }
>>
>> }
>>
>> }
>>
>>
>>
>>
>> *SERDE*
>>
>>
>> public class KafkaPayloadSerdes {
>>
>> static private class WrapperSerde implements
>> Serde {
>> final private Serializer serializer;
>> final private Deserializer deserializer;
>>
>> public WrapperSerde(Serializer serializer,
>> Deserializer deserializer) {
>> this.serializer = serializer;
>> this.deserializer = deserializer;
>> }
>>
>> @Override
>> public void configure(Map<String, ?> configs, boolean isKey) {
>> serializer.configure(configs, isKey);
>> deserializer.configure(conf

Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Ratha v
HI Michael;
Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
in my 'targetTopic'.
But still I get my class cast exception issue, when I consume message from
the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)

*ConsumerRecords records = consumer.poll(Long.MAX_VALUE);*



*Exception*

*java.lang.ClassCastException: java.lang.String cannot be cast to
xxx.core.kafkamodels.KafkaPayload at
xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108)
~[classes/:?]*

at
xx.core.listener.KafkaMessageListenerThread.process(KafkaMessageListenerThread.java:68)
~[classes/:?]

at
xx.core.listener.KafkaMessageListenerThread.lambda$run$1(KafkaMessageListenerThread.java:50)
~[classes/:?]

at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]

at
com.leightonobrien.core.listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:50)
[classes/:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_66]

at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_66]

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_66]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]



On 12 October 2016 at 13:19, Ratha v <vijayara...@gmail.com> wrote:

> HI Michael;
>
> Really appreciate for the clear explanation..
> I modified my code as you mentioned. I have written custom, Serde,
> serializer,deserializer.
> But now the problem i see is, both topics are not merged. Means, Messages
> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
> messages)
> I do not see any exceptions.
>
> Here is my custom serde, serializer/deserializer and the logic; Also I
> have properties file where i defined  following parameters;
>
> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com
> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092*
>
> *key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde*
>
> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
>
> *application.id <http://application.id>=stream-pipe*
>
>
> Do you see any issue here? Why messages are not written to ' targetTopic'?
>
>
>
> *LOGIC*
>
> /**
>
> * create stream from source topics and write it to the target topic
>
> * @param sourceTopics
>
> * @param targetTopic
>
> */
>
> public void write(String[] sourceTopics, String targetTopic) {
>
>  KafkaStreams streams = null;
>
>  KStreamBuilder builder = new KStreamBuilder();
>
>   try {
>
>KStream<String, KafkaPayload> kafkaPayloadStream = builder
> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>
>kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
> targetTopic);
>
>streams = new KafkaStreams(builder, properties);
>
>streams.start();
>
>Thread.sleep(5000L);
>
>   } catch (InterruptedException e) {
>
>   log.warn(e);
>
>  } catch (Exception e) {
>
>  log.error("Topic merge failed. ",e);
>
>   } finally {
>
>if (streams != null) {
>
>streams.close();
>
>  }
>
> }
>
> }
>
>
>
>
> *SERDE*
>
>
> public class KafkaPayloadSerdes {
>
> static private class WrapperSerde implements
> Serde {
> final private Serializer serializer;
> final private Deserializer deserializer;
>
> public WrapperSerde(Serializer serializer,
> Deserializer deserializer) {
> this.serializer = serializer;
> this.deserializer = deserializer;
> }
>
> @Override
> public void configure(Map<String, ?> configs, boolean isKey) {
> serializer.configure(configs, isKey);
> deserializer.configure(configs, isKey);
> }
>
> @Override
> public void close() {
> serializer.close();
> deserializer.close();
> }
>
> @Override
> public Serializer serializer() {
> return serializer;
> }
>
> @Override
> public Deserializer deserializer() {
> return deserializer;
> }
> }
>
> static public final class KafkaPayloadSerde extends
> WrapperSerde {
> public KafkaPayloadSerde() {
> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
> }
> }
>
> /**
> * A serde for nullable < KafkaPayload> type.
> */
> static public Serde KafkaPayload() {
> return new KafkaPayloadSerde();
> }
>
> }
>
>
> *Serilizer/Deserializer*
>
>
>
> public class KafkaPayloadSerializer implements Serializer,
> Deserializer {
>
> private static final Logger log = org.a

Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-11 Thread Ratha v
> >
> > This configures your application to interpret (= encode/decode), by
> > default, the keys and values of any messages it reads from Kafka as
> > strings.  This works for the PipeDemo example because the keys and values
> > are actually strings.
> >
> > In your application, however, you do:
> >
> >KStream<String, KafkaPayload> kafkaPayloadStream =
> > builder.stream(sourceTopics);
> >
> > This won't work, because `builder.stream()`, when calling it without
> > explicit serdes, will use the default serdes configured for your
> > application.  So `builder.stream(sourceTopics)` will give you
> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`.  Also,
> you
> > can't just cast a String to KafkaPayload to "fix" the problem;  if you
> > attempt to do so you run into the ClassCastException that you reported
> > below.
> >
> > What you need to do fix your problem is:
> >
> > 1. Provide a proper serde for `KafkaPayload`.  See
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#implementing-custom-serializers-deserializers-serdes.  There
> > are also example implementations of such custom serdes at [1] and [2].
> >
> > Once you have that, you can e.g. write:
> >
> > final Serde stringSerde = Serdes.String(); // provided by
> Kafka
> > final Serde kafkaPayloadSerde = ...; // must be
> provided
> > by you!
> >
> > 2.  Call `builder.stream()` with explicit serdes to overrides the default
> > serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the
> values.
> >
> > KStream<String, KafkaPayload> kafkaPayloadStream =
> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
> >
> > That should do it.
> >
> > Lastly, you must think about serialization also when calling `to()` or
> > `through()`:
> >
> > kafkaPayloadStream.to(targetTopic);
> >
> > If you haven't changed to default key and value serdes, then `to()` will
> > fail because it will by default (in your app configuration) interpret
> > message values still as strings rather than KafkaPayload.  To fix this
> you
> > should call:
> >
> > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
> >
> > You need to override the default serdes whenever the data must be written
> > with, well, non-default serdes.
> >
> > I'd recommend reading http://docs.confluent.io/
> current/streams/developer-
> > guide.html#data-types-and-serialization to better understand how this
> > works.
> >
> >
> > Hope this helps,
> > Michael
> >
> >
> >
> > [1] http://docs.confluent.io/current/streams/developer-
> > guide.html#available-serializers-deserializers-serdes
> > [2] https://github.com/confluentinc/examples/tree/
> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> > confluent/examples/streams/utils
> >
> >
> >
> >
> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> wrote:
> >
> >> I checked my target topic and I see few messages than the source topic.
> >> (If
> >> source topic have 5 messages, I see 2 messages in my target topic) What
> >> settings I need to do ?
> >>
> >> And, when I try to consume message from the target topic, I get
> ClassCast
> >> Exception.
> >>
> >> java.lang.ClassCastException: java.lang.String cannot be cast to
> >> xx.yy.core.kafkamodels.KafkaPayload;
> >>
> >> * receivedPayload = (KafkaPayload) consumerRecord.value();*
> >>
> >>
> >> I Merge two topics like;
> >>
> >> * KStreamBuilder builder = new KStreamBuilder();*
> >>
> >> * KStream<String, KafkaPayload> kafkaPayloadStream =
> >> builder.stream(sourceTopics);*
> >>
> >> * kafkaPayloadStream.to(targetTopic);*
> >>
> >> * streams = new KafkaStreams(builder, properties);*
> >>
> >> * streams.start();*
> >>
> >>
> >> Why do I see classcast exception when consuming the message?
> >>
> >>
> >> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote:
> >>
> >> > Hi all;
> >> > I have custom datatype defined (a pojo class).
> >> > I copy  messages from one topic to another topic.
> >> > I do not see any messages in my target topic.
> >> > This works fro string messages, but not for my custom message.
> >> > Waht might be the cause?
> >> > I followed this sample [1]
> >> > [1]
> >> > https://github.com/apache/kafka/blob/trunk/streams/
> >> > examples/src/main/java/org/apache/kafka/streams/examples/
> >> > pipe/PipeDemo.java
> >> >
> >> >
> >> > --
> >> > -Ratha
> >> > http://vvratha.blogspot.com/
> >> >
> >>
> >>
> >>
> >> --
> >> -Ratha
> >> http://vvratha.blogspot.com/
> >>
> >
> >
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-10 Thread Ratha v
I checked my target topic and I see few messages than the source topic. (If
source topic have 5 messages, I see 2 messages in my target topic) What
settings I need to do ?

And, when I try to consume message from the target topic, I get ClassCast
Exception.

java.lang.ClassCastException: java.lang.String cannot be cast to
xx.yy.core.kafkamodels.KafkaPayload;

* receivedPayload = (KafkaPayload) consumerRecord.value();*


I Merge two topics like;

* KStreamBuilder builder = new KStreamBuilder();*

* KStream<String, KafkaPayload> kafkaPayloadStream =
builder.stream(sourceTopics);*

* kafkaPayloadStream.to(targetTopic);*

* streams = new KafkaStreams(builder, properties);*

* streams.start();*


Why do I see classcast exception when consuming the message?


On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote:

> Hi all;
> I have custom datatype defined (a pojo class).
> I copy  messages from one topic to another topic.
> I do not see any messages in my target topic.
> This works fro string messages, but not for my custom message.
> Waht might be the cause?
> I followed this sample [1]
> [1]
> https://github.com/apache/kafka/blob/trunk/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/
> pipe/PipeDemo.java
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


KafkaStream Merging two topics is not working fro custom datatypes

2016-10-10 Thread Ratha v
Hi all;
I have custom datatype defined (a pojo class).
I copy  messages from one topic to another topic.
I do not see any messages in my target topic.
This works fro string messages, but not for my custom message.
Waht might be the cause?
I followed this sample [1]
[1]
https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java


-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to merge two topics in kafka?

2016-10-10 Thread Ratha v
I checked my targetTopic for available messages, but it says '0' . What
might cause issue here to merge  two topics  with custom type messages ?

On 11 October 2016 at 14:44, Ratha v <vijayara...@gmail.com> wrote:

> Thanks, this demo works perfectly for string messages.
>
> I have custom messageType defined( a java pojo class). And i have SerDe
> implemented for that.
> Now after merging sourceTopic-->Target Topic,
> I could not consume the messages..Means, Consumer does not return any
> messages.
>
> What might be the cause?
>
> On 10 October 2016 at 17:54, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Check this example
>> https://github.com/apache/kafka/blob/trunk/streams/examples/
>> src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
>>
>> On Mon, Oct 10, 2016 at 11:34 AM, Ratha v <vijayara...@gmail.com> wrote:
>>
>> > Hi Sachin;
>> > I went through the KStream/KTable Documentation. My scenario is very
>> > simple..I want to merge two topics( ie: Send messages available in the
>> > topic A -->topic B , in my case i'll be having only single message in
>> that
>> > topicA)
>> >
>> > Do I need Stateful processing (KStream)?
>> >
>> > Thanks.
>> >
>> > On 10 October 2016 at 13:10, Ratha v <vijayara...@gmail.com> wrote:
>> >
>> > > Thank you.I'll try the solution.
>> > > But in the highlevel consumer API, topics will be created
>> automatically?
>> > ,
>> > > We are not using zookeeper?
>> > >
>> > > On 10 October 2016 at 12:34, Sachin Mittal <sjmit...@gmail.com>
>> wrote:
>> > >
>> > >> You can use topicA.leftJoin (topicB).to (new-topic).
>> > >>
>> > >> You can the consume message from that new topic via second process.
>> Note
>> > >> you need to create all three topics in zookeeper first.
>> > >>
>> > >> On 10 Oct 2016 5:19 a.m., "Ratha v" <vijayara...@gmail.com> wrote:
>> > >>
>> > >> Hi all;
>> > >>
>> > >> I have two topics in the broker. Without consuming from one topic, I
>> > want
>> > >> to merge both topics, and will consume messages from the second
>> topic.
>> > >>
>> > >> It is because, I have two processes, one process, pushes messages to
>> > topic
>> > >> A. And the second process once finished processing, it wants to merge
>> > both
>> > >> topicA and TopicB. Then another process will consume messages from
>> the
>> > >> merged topic.
>> > >>
>> > >> How can I merge both topics in high level kafka APIs. I use Kafka
>> > >> 0.10.0.1.
>> > >>
>> > >>
>> > >> Thanks.
>> > >> --
>> > >> -Ratha
>> > >> http://vvratha.blogspot.com/
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > -Ratha
>> > > http://vvratha.blogspot.com/
>> > >
>> >
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to merge two topics in kafka?

2016-10-10 Thread Ratha v
Thanks, this demo works perfectly for string messages.

I have custom messageType defined( a java pojo class). And i have SerDe
implemented for that.
Now after merging sourceTopic-->Target Topic,
I could not consume the messages..Means, Consumer does not return any
messages.

What might be the cause?

On 10 October 2016 at 17:54, Sachin Mittal <sjmit...@gmail.com> wrote:

> Check this example
> https://github.com/apache/kafka/blob/trunk/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/
> pipe/PipeDemo.java
>
> On Mon, Oct 10, 2016 at 11:34 AM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi Sachin;
> > I went through the KStream/KTable Documentation. My scenario is very
> > simple..I want to merge two topics( ie: Send messages available in the
> > topic A -->topic B , in my case i'll be having only single message in
> that
> > topicA)
> >
> > Do I need Stateful processing (KStream)?
> >
> > Thanks.
> >
> > On 10 October 2016 at 13:10, Ratha v <vijayara...@gmail.com> wrote:
> >
> > > Thank you.I'll try the solution.
> > > But in the highlevel consumer API, topics will be created
> automatically?
> > ,
> > > We are not using zookeeper?
> > >
> > > On 10 October 2016 at 12:34, Sachin Mittal <sjmit...@gmail.com> wrote:
> > >
> > >> You can use topicA.leftJoin (topicB).to (new-topic).
> > >>
> > >> You can the consume message from that new topic via second process.
> Note
> > >> you need to create all three topics in zookeeper first.
> > >>
> > >> On 10 Oct 2016 5:19 a.m., "Ratha v" <vijayara...@gmail.com> wrote:
> > >>
> > >> Hi all;
> > >>
> > >> I have two topics in the broker. Without consuming from one topic, I
> > want
> > >> to merge both topics, and will consume messages from the second topic.
> > >>
> > >> It is because, I have two processes, one process, pushes messages to
> > topic
> > >> A. And the second process once finished processing, it wants to merge
> > both
> > >> topicA and TopicB. Then another process will consume messages from the
> > >> merged topic.
> > >>
> > >> How can I merge both topics in high level kafka APIs. I use Kafka
> > >> 0.10.0.1.
> > >>
> > >>
> > >> Thanks.
> > >> --
> > >> -Ratha
> > >> http://vvratha.blogspot.com/
> > >>
> > >
> > >
> > >
> > > --
> > > -Ratha
> > > http://vvratha.blogspot.com/
> > >
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to merge two topics in kafka?

2016-10-10 Thread Ratha v
Hi Sachin;
I went through the KStream/KTable Documentation. My scenario is very
simple..I want to merge two topics( ie: Send messages available in the
topic A -->topic B , in my case i'll be having only single message in that
topicA)

Do I need Stateful processing (KStream)?

Thanks.

On 10 October 2016 at 13:10, Ratha v <vijayara...@gmail.com> wrote:

> Thank you.I'll try the solution.
> But in the highlevel consumer API, topics will be created automatically? ,
> We are not using zookeeper?
>
> On 10 October 2016 at 12:34, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> You can use topicA.leftJoin (topicB).to (new-topic).
>>
>> You can the consume message from that new topic via second process. Note
>> you need to create all three topics in zookeeper first.
>>
>> On 10 Oct 2016 5:19 a.m., "Ratha v" <vijayara...@gmail.com> wrote:
>>
>> Hi all;
>>
>> I have two topics in the broker. Without consuming from one topic, I want
>> to merge both topics, and will consume messages from the second topic.
>>
>> It is because, I have two processes, one process, pushes messages to topic
>> A. And the second process once finished processing, it wants to merge both
>> topicA and TopicB. Then another process will consume messages from the
>> merged topic.
>>
>> How can I merge both topics in high level kafka APIs. I use Kafka
>> 0.10.0.1.
>>
>>
>> Thanks.
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to merge two topics in kafka?

2016-10-09 Thread Ratha v
Thank you.I'll try the solution.
But in the highlevel consumer API, topics will be created automatically? ,
We are not using zookeeper?

On 10 October 2016 at 12:34, Sachin Mittal <sjmit...@gmail.com> wrote:

> You can use topicA.leftJoin (topicB).to (new-topic).
>
> You can the consume message from that new topic via second process. Note
> you need to create all three topics in zookeeper first.
>
> On 10 Oct 2016 5:19 a.m., "Ratha v" <vijayara...@gmail.com> wrote:
>
> Hi all;
>
> I have two topics in the broker. Without consuming from one topic, I want
> to merge both topics, and will consume messages from the second topic.
>
> It is because, I have two processes, one process, pushes messages to topic
> A. And the second process once finished processing, it wants to merge both
> topicA and TopicB. Then another process will consume messages from the
> merged topic.
>
> How can I merge both topics in high level kafka APIs. I use Kafka 0.10.0.1.
>
>
> Thanks.
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


How to merge two topics in kafka?

2016-10-09 Thread Ratha v
Hi all;

I have two topics in the broker. Without consuming from one topic, I want
to merge both topics, and will consume messages from the second topic.

It is because, I have two processes, one process, pushes messages to topic
A. And the second process once finished processing, it wants to merge both
topicA and TopicB. Then another process will consume messages from the
merged topic.

How can I merge both topics in high level kafka APIs. I use Kafka 0.10.0.1.


Thanks.
-- 
-Ratha
http://vvratha.blogspot.com/


kafka.common.ConsumerRebalanceFailedException: can't rebalance after 4

2016-08-30 Thread Ratha v
HI all;
Im using 0.8 consumer with kafka 0.10.0.1.
When i run my consumer app (within wildfly 10.0.x) Im getting follwoing
exception and consumer is not listning any messages;
I tried increasing *"rebalance.backoff.ms ",
"zookeeper.session.timeout.ms
","rebalance.max.retries" values,* but
still no luck.
If I write a simple java program with main method, it works for same topic
and groupID.

Can anyone point me the fix? (I could not update to latest version
(0.9.x)of consumer APIs

*Consumer.properties*

auto.commit.enable=true

rebalance.max.retries=4

auto.commit.interval.ms=101

zookeeper.connect=zk1.abc.com\:2181,zk2. abc.com\:2181,zk3.abc.com\:2181

auto.offset.reset=largest

rebalance.backoff.ms=1

zookeeper.session.timeout.ms=6000

group.id=lob1

consumer.timeout.ms=-1

fetch.min.bytes=1


[1]

05:24:05,492 INFO  [stdout] (ServerService Thread Pool -- 75)
kafka.common.ConsumerRebalanceFailedException:
lob1_18bd15ac0c9c-1472621001050-5764721e can't rebalance after 4 retries

filerouter_1  | 05:24:05,492 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,492 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:977)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,493 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:264)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,493 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
~[kafka_2.11-0.10.0.1.jar:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at
com.abc.core.listener.KafkaMessageListener.start(KafkaMessageListener.java:126)
[core-0.0.1-SNAPSHOT.jar:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at com.labc.bean.KafkaServiceBean.initialize(KafkaServiceBean.java:47)
[classes:?]

filerouter_1  | 05:24:05,494 INFO  [stdout] (ServerService Thread Pool --
75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_91]

filerouter_1  | 05:24:05,495 INFO  [stdout] (ServerService Thread Pool --
75) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_91]

filerouter_1  | 05:24:05,495 INFO  [stdout] (ServerService Thread Pool --
75) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_91]

filerouter_1  | 05:24:05,495 INFO  [stdout] (ServerService Thread Pool --
75) at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]

filerouter_1  | 05:24:05,496 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.as.ee.component.ManagedReferenceLifecycleMethodInterceptor.processInvocation(ManagedReferenceLifecycleMethodInterceptor.java:96)
[wildfly-ee-10.1.0.Final.jar!/:10.1.0.Final]

filerouter_1  | 05:24:05,496 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
[jboss-invocation-1.4.1.Final.jar!/:1.4.1.Final]

filerouter_1  | 05:24:05,496 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doLifecycleInterception(Jsr299BindingsInterceptor.java:114)
[wildfly-weld-10.1.0.Final.jar!/:10.1.0.Final]

filerouter_1  | 05:24:05,497 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:103)
[wildfly-weld-10.1.0.Final.jar!/:10.1.0.Final]

filerouter_1  | 05:24:05,497 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
[jboss-invocation-1.4.1.Final.jar!/:1.4.1.Final]

filerouter_1  | 05:24:05,497 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:437)
[jboss-invocation-1.4.1.Final.jar!/:1.4.1.Final]

filerouter_1  | 05:24:05,498 INFO  [stdout] (ServerService Thread Pool --
75) at
org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:73)
[weld-core-impl-2.3.5.Final.jar!/:2.3.5.Final]

filerouter_1  | 05:24:05,498 INFO  [stdout] (ServerService Thread Pool --
75) at

Re: How to define multiple serializers in kafka?

2016-05-03 Thread Ratha v
thanks..I go through the schema registry..

On 3 May 2016 at 22:46, Gerard Klijs <gerard.kl...@dizzit.com> wrote:

> Then you're probably best of using the confluent schema registry, you can
> then use the io.confluent.kafka.serializers.KafkaAvroDeserializer for the
> client with KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG="true"
> to
> get back the object, deserialized with the same version of the schema the
> object was sent with.
>
> On Tue, May 3, 2016 at 12:06 PM Ratha v <vijayara...@gmail.com> wrote:
>
> > I plan to use different topics for each type of object. (number of object
> > types= number of topics)..
> > So, I need deserializers/serializers= topics = number of objects.
> >
> > What would be the better way to achieve this?
> >
> > On 3 May 2016 at 18:20, Gerard Klijs <gerard.kl...@dizzit.com> wrote:
> >
> > > If you put them in one topic, you will need one
> > > 'master' serializer/deserializers which can handle all the formats.
> > > I don't know how you would like to use Avro schemas, the confluent
> schema
> > > registry is by default configured to handle one schema at a time for
> one
> > > topic, but you could configure it to use multiple non-compatible
> schema's
> > > in one topic. Each object will be saved with a schema id, making it
> > > possible to get back the original object.
> > >
> > > On Tue, May 3, 2016 at 1:52 AM Ratha v <vijayara...@gmail.com> wrote:
> > >
> > > > What is the best way for this? Do we need to have common
> > > > serializer/deserializer for all type of the objects we publish? OR
> > > seperate
> > > > for each objects?
> > > > If we have seperate serializer/deserializers, then how can I
> configure
> > > > kafka?
> > > > Or Is it recommended to use Avro schemas?
> > > >
> > > > Thanks
> > > >
> > > > On 2 May 2016 at 18:43, Gerard Klijs <gerard.kl...@dizzit.com>
> wrote:
> > > >
> > > > > I think by design it would be better to put different kind of
> > messages
> > > > in a
> > > > > different topic. But if you would want to mix you can make your own
> > > > > serializer/deserializer you could append a 'magic byte' to the byes
> > you
> > > > get
> > > > > after you serialize, to be able to deserialize using the correct
> > > methods.
> > > > > The custom serializer would always return an Object, which you
> could
> > > cast
> > > > > when needed in the poll loop of the consumer. I think this is de
> > > > > cleanest/best way, but maybe someone has a different idea?
> > > > >
> > > > > On Mon, May 2, 2016 at 7:54 AM Ratha v <vijayara...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi all;
> > > > > >
> > > > > > Say, I publish and consume different type of java objects.For
> each
> > I
> > > > have
> > > > > > to define own serializer implementations. How can we provide all
> > > > > > implementations in the kafka consumer/producer properties file
> > under
> > > > the
> > > > > > "serializer.class" property?
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -Ratha
> > > > > > http://vvratha.blogspot.com/
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -Ratha
> > > > http://vvratha.blogspot.com/
> > > >
> > >
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to define multiple serializers in kafka?

2016-05-03 Thread Ratha v
I plan to use different topics for each type of object. (number of object
types= number of topics)..
So, I need deserializers/serializers= topics = number of objects.

What would be the better way to achieve this?

On 3 May 2016 at 18:20, Gerard Klijs <gerard.kl...@dizzit.com> wrote:

> If you put them in one topic, you will need one
> 'master' serializer/deserializers which can handle all the formats.
> I don't know how you would like to use Avro schemas, the confluent schema
> registry is by default configured to handle one schema at a time for one
> topic, but you could configure it to use multiple non-compatible schema's
> in one topic. Each object will be saved with a schema id, making it
> possible to get back the original object.
>
> On Tue, May 3, 2016 at 1:52 AM Ratha v <vijayara...@gmail.com> wrote:
>
> > What is the best way for this? Do we need to have common
> > serializer/deserializer for all type of the objects we publish? OR
> seperate
> > for each objects?
> > If we have seperate serializer/deserializers, then how can I configure
> > kafka?
> > Or Is it recommended to use Avro schemas?
> >
> > Thanks
> >
> > On 2 May 2016 at 18:43, Gerard Klijs <gerard.kl...@dizzit.com> wrote:
> >
> > > I think by design it would be better to put different kind of messages
> > in a
> > > different topic. But if you would want to mix you can make your own
> > > serializer/deserializer you could append a 'magic byte' to the byes you
> > get
> > > after you serialize, to be able to deserialize using the correct
> methods.
> > > The custom serializer would always return an Object, which you could
> cast
> > > when needed in the poll loop of the consumer. I think this is de
> > > cleanest/best way, but maybe someone has a different idea?
> > >
> > > On Mon, May 2, 2016 at 7:54 AM Ratha v <vijayara...@gmail.com> wrote:
> > >
> > > > Hi all;
> > > >
> > > > Say, I publish and consume different type of java objects.For each I
> > have
> > > > to define own serializer implementations. How can we provide all
> > > > implementations in the kafka consumer/producer properties file under
> > the
> > > > "serializer.class" property?
> > > >
> > > >
> > > > --
> > > > -Ratha
> > > > http://vvratha.blogspot.com/
> > > >
> > >
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Why the consumer not listening to the first message?

2016-05-02 Thread Ratha v
I fixed this by setting following property in my producer.

request.required.acks=1


On 3 May 2016 at 09:50, Ratha v <vijayara...@gmail.com> wrote:

> Hi all;
> In my test program,I start listener. Then sending messages in a loop.
> If i send one message, it is not listning that message. If i send 2
> messages, it listens one message.If I send 3 , it listens 2 messages..Why
> is that?
>
> *Producer*
>
> *KeyedMessage<String, byte[]> message = new KeyedMessage<String,
> byte[]>(topic, serializedBytes);*
>
> * if (log.isDebugEnabled()) {*
>
> * log.debug("producing messages to topic : " + topic + "file : " +
> payload.get("name"));*
>
> * }*
>
> * for (int i = 0; i < 3; i++) {*
>
> * producer.send(message);*
>
> * System.out.println("producing ..");*
>
> * }*
> *Consumer*
>
> *public void run() {*
>
> * try {*
>
> * ConsumerIterator<byte[], byte[]> itr = m_stream.iterator();*
>
> * log.info("Kafka listener is ready to listen..");*
>
> * System.out.println("listens");*
>
> * while (itr.hasNext()) {*
>
> * byte[] data = itr.next().message();*
>
> *System.out.println("Message received : " + data);*
> *}*
>
>
> *Consumer properties*
>
>
> enable.auto.commit=true
>
> auto.commit.interval.ms=101
>
> session.timeout.ms=7000
>
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
>
> zookeeper.connect=zk1.xx\:2181
>
> heartbeat.interval.ms=1000
>
> auto.offset.reset=smallest
>
> serializer.class=kafka.serializer.DefaultEncoder
>
> bootstrap.servers=kk1.xx\:9092
>
> group.id=test
>
> consumer.timeout.ms=-1
>
> fetch.min.bytes=1
>
> receive.buffer.bytes=262144
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to define multiple serializers in kafka?

2016-05-02 Thread Ratha v
What is the best way for this? Do we need to have common
serializer/deserializer for all type of the objects we publish? OR seperate
for each objects?
If we have seperate serializer/deserializers, then how can I configure
kafka?
Or Is it recommended to use Avro schemas?

Thanks

On 2 May 2016 at 18:43, Gerard Klijs <gerard.kl...@dizzit.com> wrote:

> I think by design it would be better to put different kind of messages in a
> different topic. But if you would want to mix you can make your own
> serializer/deserializer you could append a 'magic byte' to the byes you get
> after you serialize, to be able to deserialize using the correct methods.
> The custom serializer would always return an Object, which you could cast
> when needed in the poll loop of the consumer. I think this is de
> cleanest/best way, but maybe someone has a different idea?
>
> On Mon, May 2, 2016 at 7:54 AM Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi all;
> >
> > Say, I publish and consume different type of java objects.For each I have
> > to define own serializer implementations. How can we provide all
> > implementations in the kafka consumer/producer properties file under the
> > "serializer.class" property?
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Why the consumer not listening to the first message?

2016-05-02 Thread Ratha v
Hi all;
In my test program,I start listener. Then sending messages in a loop.
If i send one message, it is not listning that message. If i send 2
messages, it listens one message.If I send 3 , it listens 2 messages..Why
is that?

*Producer*

*KeyedMessage message = new KeyedMessage(topic, serializedBytes);*

* if (log.isDebugEnabled()) {*

* log.debug("producing messages to topic : " + topic + "file : " +
payload.get("name"));*

* }*

* for (int i = 0; i < 3; i++) {*

* producer.send(message);*

* System.out.println("producing ..");*

* }*
*Consumer*

*public void run() {*

* try {*

* ConsumerIterator itr = m_stream.iterator();*

* log.info("Kafka listener is ready to listen..");*

* System.out.println("listens");*

* while (itr.hasNext()) {*

* byte[] data = itr.next().message();*

*System.out.println("Message received : " + data);*
*}*


*Consumer properties*


enable.auto.commit=true

auto.commit.interval.ms=101

session.timeout.ms=7000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

zookeeper.connect=zk1.xx\:2181

heartbeat.interval.ms=1000

auto.offset.reset=smallest

serializer.class=kafka.serializer.DefaultEncoder

bootstrap.servers=kk1.xx\:9092

group.id=test

consumer.timeout.ms=-1

fetch.min.bytes=1

receive.buffer.bytes=262144
-- 
-Ratha
http://vvratha.blogspot.com/


How to define multiple serializers in kafka?

2016-05-01 Thread Ratha v
Hi all;

Say, I publish and consume different type of java objects.For each I have
to define own serializer implementations. How can we provide all
implementations in the kafka consumer/producer properties file under the
"serializer.class" property?


-- 
-Ratha
http://vvratha.blogspot.com/


Iterator is in failed state -kafka 0.8x

2016-04-21 Thread Ratha v
Hi all;
I have deployed my consumer as war file in wildfly 9.X. This simple
consumer works fine in a standalone jar application.
When deploying the war file, i get,

*java.lang.IllegalStateException: Iterator is in failed state [1]*

Exception.Why is that?

*My consumer;*

*public void run() {*

* try {*

* ConsumerIterator itr = stream.iterator();*

* System.out.println("listens" );*

* while (itr.hasNext()) {*

* byte[] data = itr.next().message();*
*.*


[1]

15:29:38,938 INFO  [stdout] (ServerService Thread Pool -- 68) Kafka service
is started

15:29:38,938 ERROR [stderr] (pool-8-thread-1)
java.lang.IllegalStateException: Iterator is in failed state

15:29:38,938 INFO  [stdout] (pool-7-thread-1) listens

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
com.xx..listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:45)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
java.util.concurrent.FutureTask.run(FutureTask.java:266)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

15:29:38,939 ERROR [stderr] (pool-8-thread-1) at
java.lang.Thread.run(Thread.java:745)

15:29:38,940 ERROR [stderr] (pool-7-thread-1)
java.lang.IllegalStateException: Iterator is in failed state

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
com.xxl.listener.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:45)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
java.util.concurrent.FutureTask.run(FutureTask.java:266)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

15:29:38,940 ERROR [stderr] (pool-7-thread-1) at
java.lang.Thread.run(Thread.java:745)

15:2
-- 
-Ratha
http://vvratha.blogspot.com/


JBOSS gives org.apache.kafka.common.KafkaException: auth.conf cannot be read

2016-04-21 Thread Ratha v
When I deploy war of my simple kafka project (which works fine as a jar )
in wildfly v 10 , i get some zookeeper connection exception[1].This occurs
when kafka listener starts to connect with zookeeper

[1]

15:21:58,531 ERROR [org.jboss.msc.service.fail] (ServerService Thread Pool
-- 82) MSC01: Failed to start service
jboss.deployment.unit."ratha.war".component.KafkaServiceBean.START:
org.jboss.msc.service.StartException in service
jboss.deployment.unit."ratha.war".component.KafkaServiceBean.START:
java.lang.IllegalStateException: WFLYEE0042: Failed to construct component
instance

at
org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:57)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

at org.jboss.threads.JBossThread.run(JBossThread.java:320)

Caused by: java.lang.IllegalStateException: WFLYEE0042: Failed to construct
component instance

at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:163)

at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)

at
org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)

at
org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)

at
org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)

at
org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)

... 6 more

Caused by: javax.ejb.EJBException: org.apache.kafka.common.KafkaException:
File
jar:file:/Users/ratha/projects/wildfly-10.0.0.Final/modules/system/layers/base/org/picketbox/main/picketbox-4.9.4.Final.jar!/auth.confcannot
be read.

at
org.jboss.as.ejb3.tx.CMTTxInterceptor.handleExceptionInOurTx(CMTTxInterceptor.java:187)

at
org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:277)

at
org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)

at
org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)

at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

at
org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)

at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

at
org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)

at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

at
org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)

at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

at
org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)

at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)

at
org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)

at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

at
org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)

at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)

... 11 more

Caused by: org.apache.kafka.common.KafkaException: File
jar:file:/Users/ratha/projects/wildfly-10.0.0.Final/modules/system/layers/base/org/picketbox/main/picketbox-4.9.4.Final.jar!/auth.confcannot
be read.

at
org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:95)

at
kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:184)

at
kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:129)

at
kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:66)

at
kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:69)

at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120)

at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to deserialize the object without avro schema?

2016-04-19 Thread Ratha v
Thanks..I try to follow that trick..

On 19 April 2016 at 17:10, <jan.o...@wooga.net> wrote:

>
>
> Hi, avro Schemas imply a pretty big overhead, if you would include them in
> every message. It's good practice to include a schema id with the
> message... Then you need a schema repository to lookup the matching schema
> based on the id.
>
> Have a look at confluent.io. They offer a schema repo among other Kafka
> related tools.
>
> Regards
>
> Jan
>
>
>
> Sent from my iPhone
> > On 19 Apr 2016, at 08:02, Ratha v <vijayara...@gmail.com> wrote:
> >
> > Hi all;
> >
> > I try to publish/consume my java objects to kafka. I use Avro schema.
> >
> > My basic program works fine. In my program i use my schema in the
> producer
> > (for encoding) and consumer (decoding).
> >
> > If i publish different objects to different topics( eg: 100 topics)at the
> > receiver, i do not know, what type of message i received. I would like to
> > get the avro schema from the received byte and would like to use that for
> > decoding. Is that right? If so, how can i retrieve from the received
> object?
> > Or is there any better approach?
> >
> > Thanks.
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


How to deserialize the object without avro schema?

2016-04-19 Thread Ratha v
Hi all;

I try to publish/consume my java objects to kafka. I use Avro schema.

My basic program works fine. In my program i use my schema in the producer
(for encoding) and consumer (decoding).

If i publish different objects to different topics( eg: 100 topics)at the
receiver, i do not know, what type of message i received. I would like to
get the avro schema from the received byte and would like to use that for
decoding. Is that right? If so, how can i retrieve from the received object?
Or is there any better approach?

Thanks.
-- 
-Ratha
http://vvratha.blogspot.com/


What is the best way to publish and consume different type of messages?

2016-04-12 Thread Ratha v
Hi all;
Im using kafka 0.0.8V.

I want to publish /consume byte[] objects, java bean objects, serializable
objects and much more..

What is the best way to define a publisher and consumer for this type
scenario?
When I consume a message from the consumer iterator, I do not know what
type of the message it is.
Can anybody point me a guide on how to design such scenarios?

Thanks.

-- 
-Ratha
http://vvratha.blogspot.com/


Why my consumer sometimes not listening messages?

2016-04-11 Thread Ratha v
I use older kafka consumer 0.8V.
*Steps*

   - Starting listener
   - Send 10 messages .Listener listens around 4 messages.
   - Send single message. Listener not listening
   - Again single message published. Listener is not listening.

Can anyone explain this behaviour?
-- 
-Ratha
http://vvratha.blogspot.com/


Apache kafka listener listens less messages than produced

2016-04-10 Thread Ratha v
I use old kafka consumer and producer(v 0.8x)
When i produce 10 messages i get only 6 messages..Is there anything wrong
in my listener .properties?

I start listener first then producing ..

*Listener properties*

enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.xx.FileSerializer
zookeeper.connect=zookeeper.xx.com\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=com.xxx.FileEncoderDecoder
bootstrap.servers=kafka.xxx.com\:9092
group.id=test
consumer.timeout.ms=1
fetch.min.bytes=1
receive.buffer.bytes=262144

-- 
-Ratha
http://vvratha.blogspot.com/


Re: WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

2016-04-06 Thread Ratha v
Sorry my bad..I should not add "?" at the end

On 6 April 2016 at 11:01, Ratha v <vijayara...@gmail.com> wrote:

> Hi all;
> when I run the following command with kafka 0.9.0.1 i get this
> warnings[1]. Can you please tell me what is wrong with my topics? (I'm
> talking to the kafka broker which runs in ec2)
>
> *#./kafka-console-consumer.sh --new-consumer --bootstrap-server
> kafka.xx.com:9092 <http://kafka.xx.com:9092> --topic MY_TOPIC?*
>
> [1]
> -- [2016-04-06 10:57:45,839] WARN Error while fetching metadata with
> correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 10:57:46,066] WARN Error while fetching metadata with
> correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 10:57:46,188] WARN Error while fetching metadata with
> correlation id 5 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 10:57:46,311] WARN Error while fetching metadata with
> correlation id 7 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How new consumer get to know about zookeeper URL?

2016-04-05 Thread Ratha v
Thanks Ian

On 6 April 2016 at 13:26, Ian Wrigley <i...@confluent.io> wrote:

> Hi Ratha
>
> New Consumers don’t use ZooKeeper; all offsets are stored in a Kafka topic.
>
> Regards
>
> Ian.
>
> > On Apr 5, 2016, at 10:20 PM, Ratha v <vijayara...@gmail.com> wrote:
> >
> > Hi all;
> > Im using kafka 0.9.0.1 V with new consume APIs.
> >
> > I would like to know how new consumers get to know about the zookeeper's
> > URL?
> >
> > Thanks
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>
>


-- 
-Ratha
http://vvratha.blogspot.com/


Re: Is there any behavioural change to connect local server and remote server?

2016-04-05 Thread Ratha v
Here is my  current server.properties; (Do I need to change host.name too?)


listeners=PLAINTEXT://:9092


# The port the socket server listens on

port=9092


# Hostname the broker will bind to. If not set, the server will bind to all
interfaces

#host.name=localhost


# Hostname the broker will advertise to producers and consumers. If not
set, it uses the

# value for "host.name" if configured.  Otherwise, it will use the value
returned from

# java.net.InetAddress.getCanonicalHostName().

advertised.host.name=kafka.xx.com


# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

advertised.port=9092


# The number of threads handling network requests

num.network.threads=3


# The number of threads doing disk I/O

num.io.threads=8

On 6 April 2016 at 10:08, Ratha v <vijayara...@gmail.com> wrote:

> Hi Ewen;
> Thanks ..Yes broker configuration has been set as you mentioned.
> But when i try following command, i see this exception..Do you know the
> reason?
>
> * #kafka-console-consumer.sh --new-consumer --bootstrap-server
> kafka.xx.com:9092 <http://kafka.xx.com:9092> --topic TEST_NPB4?*
>
> [2016-04-06 09:45:26,855] WARN Error while fetching metadata with
> correlation id 1 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,080] WARN Error while fetching metadata with
> correlation id 3 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,203] WARN Error while fetching metadata with
> correlation id 5 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,326] WARN Error while fetching metadata with
> correlation id 7 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,449] WARN Error while fetching metadata with
> correlation id 9 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,573] WARN Error while fetching metadata with
> correlation id 11 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,699] WARN Error while fetching metadata with
> correlation id 13 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,825] WARN Error while fetching metadata with
> correlation id 15 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:27,949] WARN Error while fetching metadata with
> correlation id 17 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,075] WARN Error while fetching metadata with
> correlation id 19 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,200] WARN Error while fetching metadata with
> correlation id 21 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,326] WARN Error while fetching metadata with
> correlation id 23 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,450] WARN Error while fetching metadata with
> correlation id 25 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,572] WARN Error while fetching metadata with
> correlation id 27 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,696] WARN Error while fetching metadata with
> correlation id 29 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,820] WARN Error while fetching metadata with
> correlation id 31 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:28,945] WARN Error while fetching metadata with
> correlation id 33 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
> (org.apache.kafka.clients.NetworkClient)
>
> [2016-04-06 09:45:29,070]
>
> On 6 April 2016 at 07:09, Ewen Cheslack-Postava <e...@confluent.io> wrote:
>
>> Ratha,
>>
>> In EC2, you probably need to use the advertised.listeners setting (or
>> advertised.host and advertised.port on older brokers). This is because EC2
>> has internal and external addresses for each instance.
>>
>> -Ewen
>>
>> On Tue, Apr 5, 2016 at 5:13 AM, Ratha v <vijayara...@gmail.com> wrote:
>>
>> > Hi all;
>> > Is there any different connection mechanism for local and remote (ec2
>> > instance) server. Im asking because, my consumer is not working with
>> remote
>> > server.
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


How new consumer get to know about zookeeper URL?

2016-04-05 Thread Ratha v
Hi all;
Im using kafka 0.9.0.1 V with new consume APIs.

I would like to know how new consumers get to know about the zookeeper's
URL?

Thanks

-- 
-Ratha
http://vvratha.blogspot.com/


WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

2016-04-05 Thread Ratha v
Hi all;
when I run the following command with kafka 0.9.0.1 i get this warnings[1].
Can you please tell me what is wrong with my topics? (I'm talking to the
kafka broker which runs in ec2)

*#./kafka-console-consumer.sh --new-consumer --bootstrap-server
kafka.xx.com:9092  --topic MY_TOPIC?*

[1]
-- [2016-04-06 10:57:45,839] WARN Error while fetching metadata with
correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 10:57:46,066] WARN Error while fetching metadata with
correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 10:57:46,188] WARN Error while fetching metadata with
correlation id 5 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 10:57:46,311] WARN Error while fetching metadata with
correlation id 7 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)
-Ratha
http://vvratha.blogspot.com/


Re: Is there any behavioural change to connect local server and remote server?

2016-04-05 Thread Ratha v
Hi Ewen;
Thanks ..Yes broker configuration has been set as you mentioned.
But when i try following command, i see this exception..Do you know the
reason?

* #kafka-console-consumer.sh --new-consumer --bootstrap-server
kafka.xx.com:9092 <http://kafka.xx.com:9092> --topic TEST_NPB4?*

[2016-04-06 09:45:26,855] WARN Error while fetching metadata with
correlation id 1 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,080] WARN Error while fetching metadata with
correlation id 3 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,203] WARN Error while fetching metadata with
correlation id 5 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,326] WARN Error while fetching metadata with
correlation id 7 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,449] WARN Error while fetching metadata with
correlation id 9 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,573] WARN Error while fetching metadata with
correlation id 11 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,699] WARN Error while fetching metadata with
correlation id 13 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,825] WARN Error while fetching metadata with
correlation id 15 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:27,949] WARN Error while fetching metadata with
correlation id 17 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,075] WARN Error while fetching metadata with
correlation id 19 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,200] WARN Error while fetching metadata with
correlation id 21 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,326] WARN Error while fetching metadata with
correlation id 23 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,450] WARN Error while fetching metadata with
correlation id 25 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,572] WARN Error while fetching metadata with
correlation id 27 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,696] WARN Error while fetching metadata with
correlation id 29 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,820] WARN Error while fetching metadata with
correlation id 31 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:28,945] WARN Error while fetching metadata with
correlation id 33 : {TEST_NPB4?=INVALID_TOPIC_EXCEPTION}
(org.apache.kafka.clients.NetworkClient)

[2016-04-06 09:45:29,070]

On 6 April 2016 at 07:09, Ewen Cheslack-Postava <e...@confluent.io> wrote:

> Ratha,
>
> In EC2, you probably need to use the advertised.listeners setting (or
> advertised.host and advertised.port on older brokers). This is because EC2
> has internal and external addresses for each instance.
>
> -Ewen
>
> On Tue, Apr 5, 2016 at 5:13 AM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi all;
> > Is there any different connection mechanism for local and remote (ec2
> > instance) server. Im asking because, my consumer is not working with
> remote
> > server.
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
-Ratha
http://vvratha.blogspot.com/


Is there any behavioural change to connect local server and remote server?

2016-04-05 Thread Ratha v
Hi all;
Is there any different connection mechanism for local and remote (ec2
instance) server. Im asking because, my consumer is not working with remote
server.
-- 
-Ratha
http://vvratha.blogspot.com/


Re: New consumer API waits indefinitely

2016-04-04 Thread Ratha v
This is the same logs i get with my local kafka server, that works fine..

On 5 April 2016 at 10:20, Ratha v <vijayara...@gmail.com> wrote:

> HI Niko;
> I face this issue with linux systems..
> I changed the logging level to debug and when I start and stop my consumer
> (stopping the program)
>  I get same exception. What is the cause here?
>
> [2016-04-05 00:01:08,784] DEBUG Connection with /192.xx.xx.248
> disconnected (org.apache.kafka.common.network.Selector)
>
> kafka_1 | java.io.EOFException
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>
> kafka_1 | at
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>
> kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)
>
> kafka_1 | at java.lang.Thread.run(Thread.java:745)
>
> kafka_1 | [2016-04-05 00:01:09,236] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:11,236] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:13,238] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:14,078] DEBUG Connection with /192.168.0.248
> disconnected (org.apache.kafka.common.network.Selector)
>
> kafka_1 | java.io.EOFException
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>
> kafka_1 | at
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>
> kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)
>
> kafka_1 | at java.lang.Thread.run(Thread.java:745)
>
> kafka_1 | [2016-04-05 00:01:15,240] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:17,240] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:19,242] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)
>
> kafka_1 | [2016-04-05 00:01:19,558] DEBUG Connection with /192.xx.xx.248
> disconnected (org.apache.kafka.common.network.Selector)
>
> kafka_1 | java.io.EOFException
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>
> kafka_1 | at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>
> kafka_1 | at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>
> kafka_1 | at
> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>
> kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)
>
> kafka_1 | at java.lang.Thread.run(Thread.java:745)
>
> kafka_1 | [2016-04-05 00:01:21,242] DEBUG Got ping response for
> sessionid: 0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnx
>
>
> On 5 April 2016 at 04:29, Niko Davor <nikoda...@gmail.com> wrote:
>
>> M. Lohith Samaga,
>>
>> Your Java code looks fine.
>>
>> Usually, if consumer.poll(100); doesn't return, there is probably a basic
>> connection error. If Kafka can't connect, it will internally go into an
>> infinite loop. To me, that doesn't seem like a good design, but that's a
>> separate tangent.
>>
>> Turn SLF4J root logging up to debug and you will probably see the
>> connection error messages.
>>
>> A second thought is it might be worth trying using Kafka on a small Linux
>> VM. The docs say, "Windows is not currently a well supported platform
>> though we would be happy to change that.". Even if you want to use Windows
>> as a server in the long run, at least as a development test option, I'd
>> want to be able to test with a Linux VM.
>>
>> FYI, I'm a Kafka newbie, and I've had no pr

Re: New consumer API waits indefinitely

2016-04-04 Thread Ratha v
HI Niko;
I face this issue with linux systems..
I changed the logging level to debug and when I start and stop my consumer
(stopping the program)
 I get same exception. What is the cause here?

[2016-04-05 00:01:08,784] DEBUG Connection with /192.xx.xx.248 disconnected
(org.apache.kafka.common.network.Selector)

kafka_1 | java.io.EOFException

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)

kafka_1 | at
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)

kafka_1 | at java.lang.Thread.run(Thread.java:745)

kafka_1 | [2016-04-05 00:01:09,236] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:11,236] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:13,238] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:14,078] DEBUG Connection with /192.168.0.248
disconnected (org.apache.kafka.common.network.Selector)

kafka_1 | java.io.EOFException

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)

kafka_1 | at
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)

kafka_1 | at java.lang.Thread.run(Thread.java:745)

kafka_1 | [2016-04-05 00:01:15,240] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:17,240] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:19,242] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnxn)

kafka_1 | [2016-04-05 00:01:19,558] DEBUG Connection with /192.xx.xx.248
disconnected (org.apache.kafka.common.network.Selector)

kafka_1 | java.io.EOFException

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)

kafka_1 | at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)

kafka_1 | at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)

kafka_1 | at
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

kafka_1 | at kafka.network.Processor.run(SocketServer.scala:413)

kafka_1 | at java.lang.Thread.run(Thread.java:745)

kafka_1 | [2016-04-05 00:01:21,242] DEBUG Got ping response for sessionid:
0x253405b88b300a4 after 0ms (org.apache.zookeeper.ClientCnx


On 5 April 2016 at 04:29, Niko Davor  wrote:

> M. Lohith Samaga,
>
> Your Java code looks fine.
>
> Usually, if consumer.poll(100); doesn't return, there is probably a basic
> connection error. If Kafka can't connect, it will internally go into an
> infinite loop. To me, that doesn't seem like a good design, but that's a
> separate tangent.
>
> Turn SLF4J root logging up to debug and you will probably see the
> connection error messages.
>
> A second thought is it might be worth trying using Kafka on a small Linux
> VM. The docs say, "Windows is not currently a well supported platform
> though we would be happy to change that.". Even if you want to use Windows
> as a server in the long run, at least as a development test option, I'd
> want to be able to test with a Linux VM.
>
> FYI, I'm a Kafka newbie, and I've had no problems getting working code
> samples up and running with Kafka 0.9.0.1 and the new Producer/Consumer
> APIs. I've gotten code samples running in Java, Scala, and Python, and
> everything works, including cross language tests.
>
> Lastly, as a mailing list question, how do I reply to a question like this
> if I see the original question in the web archives but it is not in my mail
> client? I suspect that this reply will show up as a different thread which
> is not what I want.
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: New consumer API waits indefinitely

2016-04-04 Thread Ratha v
Still struggling :)
Check following threads;

   - If my producer producing, then why the consumer couldn't consume? it
   stuck @ poll()
   - Consumer thread is waiting forever, not returning any objects


I think new APIs are recommended.


On 4 April 2016 at 16:37, Lohith Samaga M <lohith.sam...@mphasis.com> wrote:

> Thanks for letting me know.
>
> Is there any work around? A fix?
>
> Which set of API is recommended for production use?
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>
>
>
>
> -Original Message-
> From: Ratha v [mailto:vijayara...@gmail.com]
> Sent: Monday, April 04, 2016 11.27
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> I too face same issue:(
>
> On 4 April 2016 at 15:51, Lohith Samaga M <lohith.sam...@mphasis.com>
> wrote:
>
> > HI,
> > Good morning.
> >
> > I am new to Kafka. So, please bear with me.
> > I am using the new Producer and Consumer API with
> > Kafka
> > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> >
> > I was able to send messages using the new Producer
> > API. I can see the messages in the Kafka data directory.
> >
> > However, when I run the consumer, it does not retrieve
> > the messages. It keeps waiting for the messages indefinitely.
> > My code (taken from Javadoc and modified)  is as below:
> >
> > props.put("bootstrap.servers", "localhost:9092");
> > props.put("group.id", "new01");
> > props.put("enable.auto.commit", "true");
> > props.put("auto.commit.interval.ms", "1000");
> > props.put("session.timeout.ms", "3");
> > props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> > props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >
> > KafkaConsumer<String, String> consumer = new
> > KafkaConsumer<>(props);
> > consumer.subscribe(Arrays.asList("new-producer"));
> > while (true) {
> > ConsumerRecords<String, String> records =
> > consumer.poll(100);
> > for (ConsumerRecord<String, String> record : records)
> > System.out.printf("offset = %d, key = %s, value =
> > %s", record.offset(), record.key(), record.value());
> > }
> >
> > Can anybody please tell me what went wrong?
> >
> > Thanks & Regards,
> > M. Lohith Samaga
> >
> > Information transmitted by this e-mail is proprietary to Mphasis, its
> > associated companies and/ or its customers and is intended for use
> > only by the individual or entity to which it is addressed, and may
> > contain information that is privileged, confidential or exempt from
> > disclosure under applicable law. If you are not the intended recipient
> > or it appears that this mail has been forwarded to you without proper
> > authority, you are notified that any use or dissemination of this
> > information in any manner is strictly prohibited. In such cases,
> > please notify us immediately at mailmas...@mphasis.com and delete this
> > mail from your records.
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: If my producer producing, then why the consumer couldn't consume? it stuck @ poll()

2016-04-04 Thread Ratha v
when I list down the partition info ;

List info = consumer.partitionsFor(topic);


it returns info like;

[Partition(topic = *MY_TOPIC*, partition = 0, leader = 1012, replicas =
[1012,], isr = [1012,]]


My communication with the broker looks like fine. I see more than 1000
messages for the topic using kafka tool (UI tool to view topics, consumer..)
But I could not figure out what blocks my consumer to poll() the message.
Anyone can figure out my issue?



On 4 April 2016 at 10:58, Ratha v <vijayara...@gmail.com> wrote:

>
> Hi all;
> Im publishing to the remote kafka server and try to consume messages from
> that remote server. (Kafka v 0.90.1)
> Publishing works fine but nor the consuming.
>
> *Publisher*
>
> package org.test;
>
> *import java.io.IOException;*
> *import java.util.Properties;*
>
> *import org.apache.kafka.clients.producer.KafkaProducer;*
> *import org.apache.kafka.clients.producer.ProducerRecord;*
>
>
> *public class Producer {*
>
> * private void generateMessgaes() throws IOException {*
> * String topic = "MY_TOPIC";*
>
> * Properties props = new Properties();*
> * props.put("bootstrap.servers", "kafka.xx.com:9092
> <http://kafka.xx.com:9092>");*
> * props.put("acks", "all");*
> * props.put("retries", 0);*
> * props.put("batch.size", 16384);*
> * props.put("linger.ms <http://linger.ms>", 1);*
> * props.put("buffer.memory", 33554432);*
> * props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringSerializer");*
>
>
> * KafkaProducer<String, String> producer = null;*
> * try {*
> * producer = new KafkaProducer<>(props);*
> * for (int i = 0; i < 10; i++) {*
> * producer.send(new ProducerRecord<String, String>(topic, "test msg"));*
> * System.out.println("producing---");*
> * }*
>
> * } catch (Throwable e) {*
> * e.printStackTrace();*
> * System.out.println("Error in publishing messages to the topic : " +
> topic);*
>
> * } finally {*
> * producer.close();*
> * }*
> * }*
>
> * public static void main(String[] args) throws IOException {*
> * Producer producer = new Producer();*
> * producer.generateMessgaes();*
> * System.out.println("$");*
> * }*
> *}*
>
>
> I can see "*producing--- and  *prints. But when i try to consume, i
> do not see "polling " print messages.. It got stuck at poll(timeout).
>
> Any clue? (I have asked this question before, asking again :()
>
> *Consumer*
>
> *package org.test;*
>
>
> *import java.util.Arrays;*
>
> *import java.util.List;*
>
> *import java.util.Properties;*
>
> *import org.apache.kafka.clients.consumer.ConsumerRecord;*
>
> *import org.apache.kafka.clients.consumer.ConsumerRecords;*
>
> *import org.apache.kafka.clients.consumer.KafkaConsumer;*
>
>
>
> *public class Listener {*
>
>
> * public void start() throws CoreException {*
>
>
> * String topic = "MY_TOPIC";*
>
>
> * List topics = Arrays.asList(topic);*
>
>
> * Properties props = new Properties();*
>
> * props.put("bootstrap.servers", "kafka.xx.com:9092
> <http://kafka.xx.com:9092>");*
>
> * props.put("enable.auto.commit", true);*
>
> * props.put("receive.buffer.bytes", 262144);*
>
> * props.put("consumer.timeout.ms <http://consumer.timeout.ms>", 1);*
>
> * props.put("session.timeout.ms <http://session.timeout.ms>", 7000);*
>
> * props.put("heartbeat.interval.ms <http://heartbeat.interval.ms>", 1000);*
>
> * props.put("auto.offset.reset", "earliest");*
>
> * props.put("group.id <http://group.id>", "test");*
>
> * props.put("fetch.min.bytes", 1);*
>
> * props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");*
>
> * props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");*
>
> * props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringDeserializer");*
>
> * KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);*
>
> * consumer.subscribe(topics);*
>
>
> * try {*
>
> * while (true) {*
>
> * ConsumerRecords<String, String> records = consumer.poll(100);*
>
> * System.out.println("polling msges : " + records.count());*
>
> * for (ConsumerRecord<String, String> record : records) {*
>
> * System.out.println("kafka record : " + record.value());*
>
> * }*
>
> * }*
>
> * } catch (Throwable e) {*
>
> * e.printStackTrace();*
>
> * System.out.println("eror in polling");*
>
> * } finally {*
>
> * consumer.close();*
>
> * }*
>
> * }*
>
>
> * public static void main(String args[]) throws CoreException {*
>
>
> * Listener listener = new Listener();*
>
> * listener.start();*
>
>
> * }*
>
> *}*
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Does kafka version 0.9.0x use zookeeper?

2016-04-03 Thread Ratha v
Thanks Gerad

On 4 April 2016 at 15:49, Gerard Klijs <gerard.kl...@dizzit.com> wrote:

> Yes, but only via the broker you connect to.
>
> On Mon, Apr 4, 2016, 07:10 Ratha v <vijayara...@gmail.com> wrote:
>
> > I'm not seeing such parameter as an input for consumer.
> >
> > Does version 0.9.x use zookeeper?
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: New consumer API waits indefinitely

2016-04-03 Thread Ratha v
I too face same issue:(

On 4 April 2016 at 15:51, Lohith Samaga M  wrote:

> HI,
> Good morning.
>
> I am new to Kafka. So, please bear with me.
> I am using the new Producer and Consumer API with Kafka
> 0.9.0.1 running on Windows 7 laptop with zookeeper.
>
> I was able to send messages using the new Producer API. I
> can see the messages in the Kafka data directory.
>
> However, when I run the consumer, it does not retrieve the
> messages. It keeps waiting for the messages indefinitely.
> My code (taken from Javadoc and modified)  is as below:
>
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "new01");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("session.timeout.ms", "3");
> props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> KafkaConsumer consumer = new
> KafkaConsumer<>(props);
> consumer.subscribe(Arrays.asList("new-producer"));
> while (true) {
> ConsumerRecords records =
> consumer.poll(100);
> for (ConsumerRecord record : records)
> System.out.printf("offset = %d, key = %s, value = %s",
> record.offset(), record.key(), record.value());
> }
>
> Can anybody please tell me what went wrong?
>
> Thanks & Regards,
> M. Lohith Samaga
>
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended
> for use only by the individual or entity to which it is addressed, and may
> contain information that is privileged, confidential or
> exempt from disclosure under applicable law. If you are not the intended
> recipient or it appears that this mail has been forwarded
> to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly
> prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
>



-- 
-Ratha
http://vvratha.blogspot.com/


Does kafka version 0.9.0x use zookeeper?

2016-04-03 Thread Ratha v
I'm not seeing such parameter as an input for consumer.

Does version 0.9.x use zookeeper?

-- 
-Ratha
http://vvratha.blogspot.com/


If my producer producing, then why the consumer couldn't consume? it stuck @ poll()

2016-04-03 Thread Ratha v
Hi all;
Im publishing to the remote kafka server and try to consume messages from
that remote server. (Kafka v 0.90.1)
Publishing works fine but nor the consuming.

*Publisher*

package org.test;

*import java.io.IOException;*
*import java.util.Properties;*

*import org.apache.kafka.clients.producer.KafkaProducer;*
*import org.apache.kafka.clients.producer.ProducerRecord;*


*public class Producer {*

* private void generateMessgaes() throws IOException {*
* String topic = "MY_TOPIC";*

* Properties props = new Properties();*
* props.put("bootstrap.servers", "kafka.xx.com:9092
");*
* props.put("acks", "all");*
* props.put("retries", 0);*
* props.put("batch.size", 16384);*
* props.put("linger.ms ", 1);*
* props.put("buffer.memory", 33554432);*
* props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");*
* props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");*
* props.put("serializer.class",
"org.apache.kafka.common.serialization.StringSerializer");*


* KafkaProducer producer = null;*
* try {*
* producer = new KafkaProducer<>(props);*
* for (int i = 0; i < 10; i++) {*
* producer.send(new ProducerRecord(topic, "test msg"));*
* System.out.println("producing---");*
* }*

* } catch (Throwable e) {*
* e.printStackTrace();*
* System.out.println("Error in publishing messages to the topic : " +
topic);*

* } finally {*
* producer.close();*
* }*
* }*

* public static void main(String[] args) throws IOException {*
* Producer producer = new Producer();*
* producer.generateMessgaes();*
* System.out.println("$");*
* }*
*}*


I can see "*producing--- and  *prints. But when i try to consume, i do
not see "polling " print messages.. It got stuck at poll(timeout).

Any clue? (I have asked this question before, asking again :()

*Consumer*

*package org.test;*


*import java.util.Arrays;*

*import java.util.List;*

*import java.util.Properties;*

*import org.apache.kafka.clients.consumer.ConsumerRecord;*

*import org.apache.kafka.clients.consumer.ConsumerRecords;*

*import org.apache.kafka.clients.consumer.KafkaConsumer;*



*public class Listener {*


* public void start() throws CoreException {*


* String topic = "MY_TOPIC";*


* List topics = Arrays.asList(topic);*


* Properties props = new Properties();*

* props.put("bootstrap.servers", "kafka.xx.com:9092
");*

* props.put("enable.auto.commit", true);*

* props.put("receive.buffer.bytes", 262144);*

* props.put("consumer.timeout.ms ", 1);*

* props.put("session.timeout.ms ", 7000);*

* props.put("heartbeat.interval.ms ", 1000);*

* props.put("auto.offset.reset", "earliest");*

* props.put("group.id ", "test");*

* props.put("fetch.min.bytes", 1);*

* props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");*

* props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");*

* props.put("serializer.class",
"org.apache.kafka.common.serialization.StringDeserializer");*

* KafkaConsumer consumer = new KafkaConsumer(props);*

* consumer.subscribe(topics);*


* try {*

* while (true) {*

* ConsumerRecords records = consumer.poll(100);*

* System.out.println("polling msges : " + records.count());*

* for (ConsumerRecord record : records) {*

* System.out.println("kafka record : " + record.value());*

* }*

* }*

* } catch (Throwable e) {*

* e.printStackTrace();*

* System.out.println("eror in polling");*

* } finally {*

* consumer.close();*

* }*

* }*


* public static void main(String args[]) throws CoreException {*


* Listener listener = new Listener();*

* listener.start();*


* }*

*}*

Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: ProducerConfig - The configuration serializer.class = org.apache.kafka.common.serialization.StringSerializer was supplied but isn't a known config

2016-04-03 Thread Ratha v
Sorry my bad..my kafka docker instance was down..

thanks

On 4 April 2016 at 10:09, Ratha v <vijayara...@gmail.com> wrote:

>
> Hi all;
> I'm getting above exception when i try to produce simple string messages
> using kafka 0.9.01 version. I could not produce messages now.
>
> The warning i get is;
>
> buffer.memory = 33554432
>
> timeout.ms = 3
>
> key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
>
> sasl.kerberos.service.name = null
>
> sasl.kerberos.ticket.renew.jitter = 0.05
>
> ssl.keystore.type = JKS
>
> ssl.trustmanager.algorithm = PKIX
>
> block.on.buffer.full = false
>
> ssl.key.password = null
>
> max.block.ms = 6
>
> sasl.kerberos.min.time.before.relogin = 6
>
> connections.max.idle.ms = 54
>
> ssl.truststore.password = null
>
> max.in.flight.requests.per.connection = 5
>
> metrics.num.samples = 2
>
> client.id =
>
> ssl.endpoint.identification.algorithm = null
>
> ssl.protocol = TLS
>
> request.timeout.ms = 3
>
> ssl.provider = null
>
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
> acks = all
>
> batch.size = 16384
>
> ssl.keystore.location = null
>
> receive.buffer.bytes = 32768
>
> ssl.cipher.suites = null
>
> ssl.truststore.type = JKS
>
> security.protocol = PLAINTEXT
>
> retries = 0
>
> max.request.size = 1048576
>
> value.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
>
> ssl.truststore.location = null
>
> ssl.keystore.password = null
>
> ssl.keymanager.algorithm = SunX509
>
> metrics.sample.window.ms = 3
>
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>
> send.buffer.bytes = 131072
>
> linger.ms = 1
>
>
> *2016-04-04_10:07:52.551 WARN  o.a.k.c.producer.ProducerConfig - The
> configuration serializer.class =
> org.apache.kafka.common.serialization.StringSerializer was supplied but
> isn't a known config.*
>
> 2016-04-04_10:07:52.553 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
> version : 0.9.0.1
>
> 2016-04-04_10:07:52.553 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
> commitId : 23c69d62a0cabf06
>
>
> Here is my sample producer;
>
> package org.test;
>
> import java.io.IOException;
> import java.util.Properties;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
>
> public class Producer {
>
> private void generateMessgaes() throws IOException {
> String topic = "LOB_TOPIC";
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "xxx:9092");
> props.put("acks", "all");
> props.put("retries", 0);
> props.put("batch.size", 16384);
> props.put("linger.ms", 1);
> props.put("buffer.memory", 33554432);
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringSerializer");
>
>
> KafkaProducer<String, String> producer = null;
> try {
> producer = new KafkaProducer<>(props);
> for (int i = 0; i < 10; i++) {
> producer.send(new ProducerRecord<String, String>(topic, "test msg"));
> }
>
> } catch (Exception e) {
> e.printStackTrace();
> System.out.println("Error in publishing messages to the topic : " + topic);
>
> } finally {
> producer.close();
> }
> }
>
> public static void main(String[] args) throws IOException {
>
> Producer producer = new Producer();
> producer.generateMessgaes();
> System.out.println("$");
> }
> }
>
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


ProducerConfig - The configuration serializer.class = org.apache.kafka.common.serialization.StringSerializer was supplied but isn't a known config

2016-04-03 Thread Ratha v
Hi all;
I'm getting above exception when i try to produce simple string messages
using kafka 0.9.01 version. I could not produce messages now.

The warning i get is;

buffer.memory = 33554432

timeout.ms = 3

key.serializer = class
org.apache.kafka.common.serialization.StringSerializer

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.keystore.type = JKS

ssl.trustmanager.algorithm = PKIX

block.on.buffer.full = false

ssl.key.password = null

max.block.ms = 6

sasl.kerberos.min.time.before.relogin = 6

connections.max.idle.ms = 54

ssl.truststore.password = null

max.in.flight.requests.per.connection = 5

metrics.num.samples = 2

client.id =

ssl.endpoint.identification.algorithm = null

ssl.protocol = TLS

request.timeout.ms = 3

ssl.provider = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

acks = all

batch.size = 16384

ssl.keystore.location = null

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.truststore.type = JKS

security.protocol = PLAINTEXT

retries = 0

max.request.size = 1048576

value.serializer = class
org.apache.kafka.common.serialization.StringSerializer

ssl.truststore.location = null

ssl.keystore.password = null

ssl.keymanager.algorithm = SunX509

metrics.sample.window.ms = 3

partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner

send.buffer.bytes = 131072

linger.ms = 1


*2016-04-04_10:07:52.551 WARN  o.a.k.c.producer.ProducerConfig - The
configuration serializer.class =
org.apache.kafka.common.serialization.StringSerializer was supplied but
isn't a known config.*

2016-04-04_10:07:52.553 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
version : 0.9.0.1

2016-04-04_10:07:52.553 INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
commitId : 23c69d62a0cabf06


Here is my sample producer;

package org.test;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;


public class Producer {

private void generateMessgaes() throws IOException {
String topic = "LOB_TOPIC";

Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.class",
"org.apache.kafka.common.serialization.StringSerializer");


KafkaProducer producer = null;
try {
producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord(topic, "test msg"));
}

} catch (Exception e) {
e.printStackTrace();
System.out.println("Error in publishing messages to the topic : " + topic);

} finally {
producer.close();
}
}

public static void main(String[] args) throws IOException {

Producer producer = new Producer();
producer.generateMessgaes();
System.out.println("$");
}
}


Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Not sure what was wrong in my remote kafka cluster as i haven't received
any exceptions from my consumer side.
I tried to connect my local kafka server and things are working (at least I
got some exceptions saying "The session timeout is not within an acceptable
range." etc..)

Thanks all for the help..



On 30 March 2016 at 12:06, Ratha v <vijayara...@gmail.com> wrote:

> Looks like this particular block runs forever..
>
> @AbstractCoordinator.class
>
>   /**
>
>  * Block until the coordinator for this group is known.
>
>  */
>
> public void ensureCoordinatorKnown() {
>
> while (coordinatorUnknown()) {
>
> RequestFuture future = sendGroupMetadataRequest();
>
> client.poll(future);
>
>
> if (future.failed()) {
>
> if (future.isRetriable())
>
> client.awaitMetadataUpdate();
>
> else
>
> throw future.exception();
>
> }
>
> }
>
> }
>
>
>
> @ ConsumerNetworkClient class "*requestsSent *returns* false @*trySend()
> *" *any idea?
>
>
>
> *@ ConsumerNetworkClient.class*
>
>
>  private boolean trySend(long now) {
>
> // send any requests that can be sent now
>
> boolean requestsSent = false;
>
> for (Map.Entry<Node, List> requestEntry: 
> unsent.entrySet())
> {
>
> Node node = requestEntry.getKey();
>
> Iterator iterator = requestEntry
> .getValue().iterator();
>
> while (iterator.hasNext()) {
>
> ClientRequest request = iterator.next();
>
> if (client.ready(node, now)) {
>
> client.send(request, now);
>
> iterator.remove();
>
> requestsSent = true;
>
> }
>
> }
>
>     }
>
> return requestsSent;
>
> On 30 March 2016 at 11:41, Ratha v <vijayara...@gmail.com> wrote:
>
>> And no error logs at kafka server..I expect any exception from my end,
>> but I'm not getting any exception :(
>>
>> On 30 March 2016 at 11:39, Ratha v <vijayara...@gmail.com> wrote:
>>
>>> Hi Shrijeet ;
>>> I checked the kafka communication layer,( tcp dumps) and I can see
>>> some packets are received by kafka server..
>>> That means communication works fine.
>>>
>>> Only issue is in my polling:(
>>>
>>> *Thanks..*
>>>
>>> On 30 March 2016 at 10:54, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
>>> wrote:
>>>
>>>> The consumer script is using old consumer which connects via zookeeper,
>>>> so
>>>> its not the same. The reason I asked because I recall poll(timeout)
>>>> getting
>>>> stuck forever if it can't reach broker.
>>>>
>>>> --
>>>> Shrijeet
>>>>
>>>> On Tue, Mar 29, 2016 at 4:51 PM, Ratha v <vijayara...@gmail.com> wrote:
>>>>
>>>> > Yes... (I checked, hostname and port, and i believe thats correct
>>>> because
>>>> >  consumer.sh script also working?)
>>>> > #* ./kafka-console-consumer.sh --zookeeper xxx:2181 --topic mytopic*
>>>> >
>>>> > On 30 March 2016 at 10:29, Shrijeet Paliwal <
>>>> shrijeet.pali...@gmail.com>
>>>> > wrote:
>>>> >
>>>> > > On Tue, Mar 29, 2016 at 4:19 PM, Ratha v <vijayara...@gmail.com>
>>>> wrote:
>>>> > >
>>>> > > > 9092
>>>> > >
>>>> > >
>>>> > > Are your brokers listening at this port?
>>>> > >
>>>> > > --
>>>> > > Shrijeet
>>>> > >
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > -Ratha
>>>> > http://vvratha.blogspot.com/
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> -Ratha
>>> http://vvratha.blogspot.com/
>>>
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: org.apache.kafka.common.errors.ApiException: The session timeout is not within an acceptable range.

2016-03-29 Thread Ratha v
I overcame this issue by setting "session.timeout.ms" at consumer
properties > group.min.session.timeout.ms @ server.properties

Thanks.

On 30 March 2016 at 13:41, Ratha v <vijayara...@gmail.com> wrote:

> I added that property  to the server.properties to a longer value than the
> session timeout value in the consumer.properties. But still i see same
> issue..
>
> Any help?
>
> On 30 March 2016 at 13:28, Ratha v <vijayara...@gmail.com> wrote:
>
>> Hi all;
>> I get above exception in my consumer with kafka 0.9.0.1..According to
>> this thread[1] i do not see the "group.max.session.timeout.ms" property
>> in the server.properties file.
>>
>> Any clue?
>>
>>
>> [1]http://comments.gmane.org/gmane.comp.apache.kafka.user/12426
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: org.apache.kafka.common.errors.ApiException: The session timeout is not within an acceptable range.

2016-03-29 Thread Ratha v
I added that property  to the server.properties to a longer value than the
session timeout value in the consumer.properties. But still i see same
issue..

Any help?

On 30 March 2016 at 13:28, Ratha v <vijayara...@gmail.com> wrote:

> Hi all;
> I get above exception in my consumer with kafka 0.9.0.1..According to this
> thread[1] i do not see the "group.max.session.timeout.ms" property in the
> server.properties file.
>
> Any clue?
>
>
> [1]http://comments.gmane.org/gmane.comp.apache.kafka.user/12426
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


org.apache.kafka.common.errors.ApiException: The session timeout is not within an acceptable range.

2016-03-29 Thread Ratha v
Hi all;
I get above exception in my consumer with kafka 0.9.0.1..According to this
thread[1] i do not see the "group.max.session.timeout.ms" property in the
server.properties file.

Any clue?


[1]http://comments.gmane.org/gmane.comp.apache.kafka.user/12426

-- 
-Ratha
http://vvratha.blogspot.com/


Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Looks like this particular block runs forever..

@AbstractCoordinator.class

  /**

 * Block until the coordinator for this group is known.

 */

public void ensureCoordinatorKnown() {

while (coordinatorUnknown()) {

RequestFuture future = sendGroupMetadataRequest();

client.poll(future);


if (future.failed()) {

if (future.isRetriable())

client.awaitMetadataUpdate();

else

throw future.exception();

}

}

}



@ ConsumerNetworkClient class "*requestsSent *returns* false @*trySend()*" *any
idea?



*@ ConsumerNetworkClient.class*


 private boolean trySend(long now) {

// send any requests that can be sent now

boolean requestsSent = false;

for (Map.Entry<Node, List> requestEntry:
unsent.entrySet())
{

Node node = requestEntry.getKey();

Iterator iterator = requestEntry
.getValue().iterator();

while (iterator.hasNext()) {

ClientRequest request = iterator.next();

if (client.ready(node, now)) {

client.send(request, now);

iterator.remove();

requestsSent = true;

}

}

}

return requestsSent;

On 30 March 2016 at 11:41, Ratha v <vijayara...@gmail.com> wrote:

> And no error logs at kafka server..I expect any exception from my end, but
> I'm not getting any exception :(
>
> On 30 March 2016 at 11:39, Ratha v <vijayara...@gmail.com> wrote:
>
>> Hi Shrijeet ;
>> I checked the kafka communication layer,( tcp dumps) and I can see
>> some packets are received by kafka server..
>> That means communication works fine.
>>
>> Only issue is in my polling:(
>>
>> *Thanks..*
>>
>> On 30 March 2016 at 10:54, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
>> wrote:
>>
>>> The consumer script is using old consumer which connects via zookeeper,
>>> so
>>> its not the same. The reason I asked because I recall poll(timeout)
>>> getting
>>> stuck forever if it can't reach broker.
>>>
>>> --
>>> Shrijeet
>>>
>>> On Tue, Mar 29, 2016 at 4:51 PM, Ratha v <vijayara...@gmail.com> wrote:
>>>
>>> > Yes... (I checked, hostname and port, and i believe thats correct
>>> because
>>> >  consumer.sh script also working?)
>>> > #* ./kafka-console-consumer.sh --zookeeper xxx:2181 --topic mytopic*
>>> >
>>> > On 30 March 2016 at 10:29, Shrijeet Paliwal <
>>> shrijeet.pali...@gmail.com>
>>> > wrote:
>>> >
>>> > > On Tue, Mar 29, 2016 at 4:19 PM, Ratha v <vijayara...@gmail.com>
>>> wrote:
>>> > >
>>> > > > 9092
>>> > >
>>> > >
>>> > > Are your brokers listening at this port?
>>> > >
>>> > > --
>>> > > Shrijeet
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > -Ratha
>>> > http://vvratha.blogspot.com/
>>> >
>>>
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
And no error logs at kafka server..I expect any exception from my end, but
I'm not getting any exception :(

On 30 March 2016 at 11:39, Ratha v <vijayara...@gmail.com> wrote:

> Hi Shrijeet ;
> I checked the kafka communication layer,( tcp dumps) and I can see
> some packets are received by kafka server..
> That means communication works fine.
>
> Only issue is in my polling:(
>
> *Thanks..*
>
> On 30 March 2016 at 10:54, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
> wrote:
>
>> The consumer script is using old consumer which connects via zookeeper, so
>> its not the same. The reason I asked because I recall poll(timeout)
>> getting
>> stuck forever if it can't reach broker.
>>
>> --
>> Shrijeet
>>
>> On Tue, Mar 29, 2016 at 4:51 PM, Ratha v <vijayara...@gmail.com> wrote:
>>
>> > Yes... (I checked, hostname and port, and i believe thats correct
>> because
>> >  consumer.sh script also working?)
>> > #* ./kafka-console-consumer.sh --zookeeper xxx:2181 --topic mytopic*
>> >
>> > On 30 March 2016 at 10:29, Shrijeet Paliwal <shrijeet.pali...@gmail.com
>> >
>> > wrote:
>> >
>> > > On Tue, Mar 29, 2016 at 4:19 PM, Ratha v <vijayara...@gmail.com>
>> wrote:
>> > >
>> > > > 9092
>> > >
>> > >
>> > > Are your brokers listening at this port?
>> > >
>> > > --
>> > > Shrijeet
>> > >
>> >
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Hi Shrijeet ;
I checked the kafka communication layer,( tcp dumps) and I can see
some packets are received by kafka server..
That means communication works fine.

Only issue is in my polling:(

*Thanks..*

On 30 March 2016 at 10:54, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
wrote:

> The consumer script is using old consumer which connects via zookeeper, so
> its not the same. The reason I asked because I recall poll(timeout) getting
> stuck forever if it can't reach broker.
>
> --
> Shrijeet
>
> On Tue, Mar 29, 2016 at 4:51 PM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Yes... (I checked, hostname and port, and i believe thats correct because
> >  consumer.sh script also working?)
> > #* ./kafka-console-consumer.sh --zookeeper xxx:2181 --topic mytopic*
> >
> > On 30 March 2016 at 10:29, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
> > wrote:
> >
> > > On Tue, Mar 29, 2016 at 4:19 PM, Ratha v <vijayara...@gmail.com>
> wrote:
> > >
> > > > 9092
> > >
> > >
> > > Are your brokers listening at this port?
> > >
> > > --
> > > Shrijeet
> > >
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Yes... (I checked, hostname and port, and i believe thats correct because
 consumer.sh script also working?)
#* ./kafka-console-consumer.sh --zookeeper xxx:2181 --topic mytopic*

On 30 March 2016 at 10:29, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
wrote:

> On Tue, Mar 29, 2016 at 4:19 PM, Ratha v <vijayara...@gmail.com> wrote:
>
> > 9092
>
>
> Are your brokers listening at this port?
>
> --
> Shrijeet
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Hi Shrijeet;

I do not see any stacktarce. It stuck at poll.

Here is my consumer conf;

zookeeper.connect=xx.com\:2181

metadata.broker.list=yy.com\:9092

enable.auto.commit=true

auto.commit.interval.ms=101

max.partition.fetch.bytes=35

session.timeout.ms=3001

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=com.xx.process.RawFileSerializer

heartbeat.interval.ms=1000

auto.offset.reset=earliest

serializer.class=com.xx.process.FileSerializer

bootstrap.servers=yy.com\:9092

group.id=test

consumer.timeout.ms=1

Thanks.

On 30 March 2016 at 10:10, Shrijeet Paliwal <shrijeet.pali...@gmail.com>
wrote:

> Can you share complete Consumer configuration? Also what does stack-trace
> show,  is it stuck in poll?
>
> I think I have seen this before in one of my tests, in my case I had pinned
> it to side effect of https://issues.apache.org/jira/browse/KAFKA-1894.
> Essentially poll(timeout) doesn't always respect the timeout :-|
>
> --
> Shrijeet
>
> On Tue, Mar 29, 2016 at 3:42 PM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi oleg;
> >
> >1. The consumer shell  (The default tool :./kafka-console-consumer.sh
> >--zookeeper xxx:2181 --topic yy
> >) consumes my messages. Only my programmed listener is not consuming
> ..I
> >provided the code in the previous thread.
> >
> >
> > Here is my producer (I believe this works, because, the* kafka
> consumer.sh
> > *
> >  consumes)
> >
> >
> >
> >
> > import java.io.FileNotFoundException;
> > import java.util.Properties;
> >
> > import com.xx.core.impl.KafkaConfigurationLoader;
> > import com.xx.core.model.base.RawFile;
> >
> > import org.apache.kafka.clients.producer.KafkaProducer;
> > import org.apache.kafka.clients.producer.ProducerRecord;
> >
> > /**
> >  * Class to produce some test messages to the Kafka server
> >  *
> >  * @author ratha
> >  *
> >  */
> > public class KafkaMessageProducer {
> > private Properties properties;
> > private String topic;
> > private File file;
> > private KafkaProducer<String, File> producer;
> >
> > public KafkaMessageProducer(String topic, File file) {
> > this.topic = topic;
> > this.file = file;
> >
> > KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
> > try {
> > properties = confLoader.loadProducerConfig();
> >
> > producer = new KafkaProducer<>(properties);
> > } catch (FileNotFoundException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > public void generateMessgaes() {
> > try {
> > for(int i=0; i<10;i++){
> > producer.send(new ProducerRecord<String, File>(topic, file));
> > }
> > } catch (Exception e) {
> > e.printStackTrace();
> > System.out.println("Error in publishing messages to the topic : " +
> topic);
> >
> > } finally {
> > producer.close();
> > }
> > }
> >
> > }
> >
> > Thanks
> >
> >
> > On 30 March 2016 at 09:31, Oleg Zhurakousky <
> ozhurakou...@hortonworks.com>
> > wrote:
> >
> > > Ratha
> > >
> > > " I published messages and started the listener ---> No success”.
> > >
> > > I would stop right here as you already see an issue. So I am assuming
> the
> > > listener you started is console listener and with that I am going to
> > assume
> > > it works providing message is on the topic. And since nothing happened
> > then
> > > you probably have issue in producer.
> > >
> > > Could you post producer code.
> > >
> > > Oleg
> > >
> > > > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote:
> > > >
> > > > Hi Oleg;
> > > >
> > > > Thanks for the guide..Here is my troubleshooting steps;
> > > >
> > > >
> > > >   1. I published messages and started the listener ---> No success
> > > >   2. I set the "auto.offset.reset" property to *earliest  --> No
> > success*
> > > >   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
> > > >   4. *T*hought may be my messages are not published, so started the
> > > >   consumer shell script (default tool in the kafka distribution),
> that
> > > >   consumes my messages well.
> > > >   5. Changed hasnext() to while (true)  --> no change in the
> behaviour
> > > >
> > > >
> > &

Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Hi oleg;

   1. The consumer shell  (The default tool :./kafka-console-consumer.sh
   --zookeeper xxx:2181 --topic yy
   ) consumes my messages. Only my programmed listener is not consuming ..I
   provided the code in the previous thread.


Here is my producer (I believe this works, because, the* kafka consumer.sh *
 consumes)




import java.io.FileNotFoundException;
import java.util.Properties;

import com.xx.core.impl.KafkaConfigurationLoader;
import com.xx.core.model.base.RawFile;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Class to produce some test messages to the Kafka server
 *
 * @author ratha
 *
 */
public class KafkaMessageProducer {
private Properties properties;
private String topic;
private File file;
private KafkaProducer<String, File> producer;

public KafkaMessageProducer(String topic, File file) {
this.topic = topic;
this.file = file;

KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
try {
properties = confLoader.loadProducerConfig();

producer = new KafkaProducer<>(properties);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}

public void generateMessgaes() {
try {
for(int i=0; i<10;i++){
producer.send(new ProducerRecord<String, File>(topic, file));
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error in publishing messages to the topic : " + topic);

} finally {
producer.close();
}
}

}

Thanks


On 30 March 2016 at 09:31, Oleg Zhurakousky <ozhurakou...@hortonworks.com>
wrote:

> Ratha
>
> " I published messages and started the listener ---> No success”.
>
> I would stop right here as you already see an issue. So I am assuming the
> listener you started is console listener and with that I am going to assume
> it works providing message is on the topic. And since nothing happened then
> you probably have issue in producer.
>
> Could you post producer code.
>
> Oleg
>
> > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote:
> >
> > Hi Oleg;
> >
> > Thanks for the guide..Here is my troubleshooting steps;
> >
> >
> >   1. I published messages and started the listener ---> No success
> >   2. I set the "auto.offset.reset" property to *earliest  --> No success*
> >   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
> >   4. *T*hought may be my messages are not published, so started the
> >   consumer shell script (default tool in the kafka distribution), that
> >   consumes my messages well.
> >   5. Changed hasnext() to while (true)  --> no change in the behaviour
> >
> >
> > Im really confused..:(
> >
> > Here is the code;
> >
> > *executor*
> >
> > public void start() {
> >
> > List topics = Arrays.asList(topic);
> >
> > ExecutorService executor = Executors.newFixedThreadPool(CoreConstants.
> > THREAD_SIZE);
> >
> > ListenerThread lThread = new ListenerThread(topics, properties);
> >
> > executor.submit(lThread);
> >
> >
> > Runtime.getRuntime().addShutdownHook(new Thread() {
> >
> > @Override
> >
> > public void run() {
> >
> > lThread.shutdown();
> >
> > executor.shutdown();
> >
> > try {
> >
> > executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
> >
> > } catch (InterruptedException e) {
> >
> > e.printStackTrace();
> >
> > }
> >
> > }
> >
> > });
> >
> > }
> >
> >
> >
> > *Thread*
> >
> >
> >
> > public void run() {
> >
> > try {
> >
> > consumer.subscribe(topics);
> >
> > while (true) {
> >
> > ConsumerRecords<String, File> records = consumer.poll(100);
> >
> > System.out.println("&&&&&& : "+records.count());
> >
> > for (ConsumerRecord<String, RawFile> record : records) {
> >
> > System.out.println("&&&&&&333");
> >
> > FileProcessor processor = new FileProcessor();
> >
> > processor.processFile(record.value());
> >
> >
> > System.out.println("&&&&&&"+record.value());
> >
> > }
> >
> > }
> >
> >
> > } catch (Throwable e) {
> >
> > e.printStackTrace();
> >
> > System.out.println("eror in polling");
> >
> > // ignore for shutdown
> >
> > } finally {
> >
> > consumer.close();
> >
> > }
> >
> >
> >
> > Do you think any other issue

Re: Consumer thread is waiting forever, not returning any objects

2016-03-29 Thread Ratha v
Hi Oleg;

Thanks for the guide..Here is my troubleshooting steps;


   1. I published messages and started the listener ---> No success
   2. I set the "auto.offset.reset" property to *earliest  --> No success*
   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
   4. *T*hought may be my messages are not published, so started the
   consumer shell script (default tool in the kafka distribution), that
   consumes my messages well.
   5. Changed hasnext() to while (true)  --> no change in the behaviour


Im really confused..:(

Here is the code;

*executor*

public void start() {

List topics = Arrays.asList(topic);

ExecutorService executor = Executors.newFixedThreadPool(CoreConstants.
THREAD_SIZE);

ListenerThread lThread = new ListenerThread(topics, properties);

executor.submit(lThread);


Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

lThread.shutdown();

executor.shutdown();

try {

executor.awaitTermination(5000, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

}



*Thread*



public void run() {

try {

consumer.subscribe(topics);

while (true) {

ConsumerRecords<String, File> records = consumer.poll(100);

System.out.println("&&&&&& : "+records.count());

for (ConsumerRecord<String, RawFile> record : records) {

System.out.println("&&&&&&333");

FileProcessor processor = new FileProcessor();

processor.processFile(record.value());


System.out.println("&&&&&&"+record.value());

}

}


} catch (Throwable e) {

e.printStackTrace();

System.out.println("eror in polling");

// ignore for shutdown

} finally {

consumer.close();

}



Do you think any other issues from my end?

Thanks


On 29 March 2016 at 23:13, Oleg Zhurakousky <ozhurakou...@hortonworks.com>
wrote:

> Ratha
>
> It appears you have couple of issues here, so I’ll start with the consumer
> first.
> If you do a search on this mailing list on “Consumer deadlock” in the
> subject you’ll find a thread where similar symptoms were discussed.
> Basically the hasNext() method you mentioned is implemented as a blocking
> call and while we may all have opinion about that decision and why Iterator
> was chosen in the first place, it is what it is. But from what I understand
> it simply means that there are no messages to poll from the topic (yes I
> know, hasNext()=false seems natural here but. . .). What you can do is set ‘
> consumer.timeout.ms’ property to value such as ‘1’. By doing so you are
> stating that you are willing to block for no longer then 1 millisecond.
> But you also mention that you are sending message to the topic and
> therefore have reasonable expectation to poll something from it but yet
> you’re blocking. That is strange indeed. What I would suggest is to try one
> thing at the time. Use your Java producer, in conjunction with console
> consumer. This will help to narrow down the problem (e.g., issue with
> producer that may not be actually sending). Then hopefully if you receive
> successfully then you know the problem is in the consumer and so on.
>
> Cheers
> Oleg
>
> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com vijayara...@gmail.com>> wrote:
>
> Hi all;
> I publish a java object and try to consume it..
> I have a poll method to consume objects, but it never returns any
> objects..My program runs forever.(?)
>
> *ConsumerThread*
>
> public void run() {
>
> try {
>
> consumer.subscribe(topics);
>
>
> Iterator<ConsumerRecord<String, File>> it = consumer.poll(1000).iterator();
>
> while (it.hasNext()) {
>
> ConsumerRecord<String, File> record = it.next();
>
> FileProcessor processor = new FileProcessor();
>
> processor.processFile(record.value());
>
> System.out.println("-" + ": " + record);
>
>
>
> When i debug it never goes inside the while loop. Why is that?
>
> I publish object like this;
>
> producer.send(new ProducerRecord<String, File>(topic, file));
>
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>
>


-- 
-Ratha
http://vvratha.blogspot.com/


Re: Why my consumer does not print any messages? help..

2016-03-29 Thread Ratha v
Thanks Cees for the help..Yes i was trying to produce the messages before
consumer starts. Now i changed the order,

   1. I published messages and started the listener ---> No success
   2. I set the "auto.offset.reset" property to *earliest  --> No success*
   3. *T*hought may be my messages are not published, so started the
   consumer shell script (default tool in the kafka distribution), that
   consumes my messages well.


Do you think any other issues from my end?

Thanks

On 30 March 2016 at 05:28, Cees de Groot <c...@pagerduty.com> wrote:

> If you publish messages before you start the consumer, not that consumers
> by default start from the end of a queue. This is to prevent consumers from
> being overwhelmed with old messages when they start. The auto.offset.reset
> setting of the consumer can influence this behavior.
>
> On Tue, Mar 29, 2016 at 6:49 AM, Ratha v <vijayara...@gmail.com> wrote:
> >
> > Hi all;
> > I use kafka 0.9.0.1.
> > I publish 10 messages to my kafka server, but when i try to use my
> > consumer, it does not retriev any messages.
> > In the following code snippet, it prints 111 and 222 and thread runs
> > forever without sending any records :(
> >
> > I set consumer.timeout property, but nothing helps me..
> >
> > Can anybody help me?
> >
> > public void run() {
> >
> > try {
> >
> > consumer.subscribe(topics);
> >
> > System.out.println("111");
> >
> > while (true) {
> >
> > System.out.println("");
> >
> > ConsumerRecords<String, RawFile> records = consumer.poll(100);
> >
> > System.out.println(" : "+records.count());
> >
> > for (ConsumerRecord<String, RawFile> record : records) {
> >
> > System.out.println("");
> >
> > FileProcessor processor = new FileProcessor();
> >
> > processor.processFile(record.value());
> >
> >
> > }
> >
> > }
> >
> >
> > } catch (Throwable e) {
> >
> > e.printStackTrace();
> >
> > System.out.println("eror in polling");
> >
> >
> > } finally {
> >
> > consumer.close();
> >
> > }
> >
> > }
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>
>
>
>
> --
> Cees de Groot
> Principal Software Engineer
> PagerDuty, Inc.
>



-- 
-Ratha
http://vvratha.blogspot.com/


Consumer thread is waiting forever, not returning any objects

2016-03-28 Thread Ratha v
Hi all;
I publish a java object and try to consume it..
I have a poll method to consume objects, but it never returns any
objects..My program runs forever.(?)

*ConsumerThread*

public void run() {

try {

consumer.subscribe(topics);


Iterator> it = consumer.poll(1000).iterator();

while (it.hasNext()) {

ConsumerRecord record = it.next();

FileProcessor processor = new FileProcessor();

processor.processFile(record.value());

System.out.println("-" + ": " + record);



When i debug it never goes inside the while loop. Why is that?

I publish object like this;

producer.send(new ProducerRecord(topic, file));


Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: java.lang.IllegalArgumentException: Heartbeat must be set lower than the session timeout

2016-03-28 Thread Ratha v
I overcame this issue with the property "heartbeat.interval.ms"..

Thanks.

On 29 March 2016 at 14:54, Ratha v <vijayara...@gmail.com> wrote:

> Hi all;
> When i try to create my consumer from the properties, i get above
> exception.
> Can anybody point me what property i set wrong?
>
>
> kafka 0.9.x version.
>
>
> *code*
>
> import org.apache.kafka.clients.consumer.KafkaConsumer;
>
>
> public ListenerThread(List topics, Properties properties) {
>
> this.consumer = new KafkaConsumer<>(properties);
>
> this.topics = topics;
>
> }
>
>
> *Error*
>
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
> to construct kafka consumer
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:518)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:500)
>
> at com.xx.ListenerThread.(ListenerThread.java:34)
>
> at com.xx..MessageListener.start(MessageListener.java:108)
>
> at com.xxx.MessageListener.main(MessageListener.java:147)
>
> Caused by: java.lang.IllegalArgumentException: Heartbeat must be set lower
> than the session timeout
>
> at
> org.apache.kafka.clients.consumer.internals.Heartbeat.(Heartbeat.java:30)
>
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.(AbstractCoordinator.java:122)
>
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.(ConsumerCoordinator.java:92)
>
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:569)
>
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


java.lang.IllegalArgumentException: Heartbeat must be set lower than the session timeout

2016-03-28 Thread Ratha v
Hi all;
When i try to create my consumer from the properties, i get above exception.
Can anybody point me what property i set wrong?


kafka 0.9.x version.


*code*

import org.apache.kafka.clients.consumer.KafkaConsumer;


public ListenerThread(List topics, Properties properties) {

this.consumer = new KafkaConsumer<>(properties);

this.topics = topics;

}


*Error*

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
to construct kafka consumer

at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624)

at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:518)

at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:500)

at com.xx.ListenerThread.(ListenerThread.java:34)

at com.xx..MessageListener.start(MessageListener.java:108)

at com.xxx.MessageListener.main(MessageListener.java:147)

Caused by: java.lang.IllegalArgumentException: Heartbeat must be set lower
than the session timeout

at
org.apache.kafka.clients.consumer.internals.Heartbeat.(Heartbeat.java:30)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.(AbstractCoordinator.java:122)

at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.(ConsumerCoordinator.java:92)

at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:569)


Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: Custom serializer/deserializer for kafka 0.9.x version

2016-03-28 Thread Ratha v
Thanks a lot..

On 29 March 2016 at 14:34, Manikumar Reddy <ku...@nmsworks.co.in> wrote:

> Hi,
>
> You need to implement org.apache.kafka.common.serialization.Serializer,
> org.apache.kafka.common.serialization.Deserializer
> interfaces. Encoder, Decoder interfaces are for older clients.
>
> Example code:
>   https://github.com/omkreddy/kafka-example
> s/tree/master/consumer/src/main/java/kafka/examples/common/serialization
>
> On Tue, Mar 29, 2016 at 8:24 AM, Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi all;
> >
> > I have written my custom serialiser/deserializer to publish/consume my
> java
> > bean objects.
> > However i get class nor found exception for "kafka.serializer" packages.
> > Can someone point me which class i have to use to implement my
> > custom serializer in kafka 0.9.x?
> >
> > import kafka.serializer.Decoder;
> > import kafka.serializer.Encoder;
> > import kafka.utils.VerifiableProperties;
> > public class FileSerializer implements Encoder, Decoder {
> >
> > public FileSerializer() {
> >
> > }
> >
> > public RawFileSerializer(VerifiableProperties verifiableProperties) {
> > /* This constructor must be present for successful compile. */
> > }
> >
> > @Override
> > public byte[] toBytes(File file) {
> > 
> >
> > }
> >
> > @Override
> > public File fromBytes(byte[] fileContent) {
> > 
> > return (File) obj;
> > }
> >
> > }
> >
> >
> > I have added following pom dependency.
> >
> > 
> >
> > org.apache.kafka
> >
> > kafka-clients
> >
> > 0.9.0.0
> >
> > 
> >
> >
> > Thanks
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Custom serializer/deserializer for kafka 0.9.x version

2016-03-28 Thread Ratha v
Hi all;

I have written my custom serialiser/deserializer to publish/consume my java
bean objects.
However i get class nor found exception for "kafka.serializer" packages.
Can someone point me which class i have to use to implement my
custom serializer in kafka 0.9.x?

import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
public class FileSerializer implements Encoder, Decoder {

public FileSerializer() {

}

public RawFileSerializer(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}

@Override
public byte[] toBytes(File file) {


}

@Override
public File fromBytes(byte[] fileContent) {

return (File) obj;
}

}


I have added following pom dependency.



org.apache.kafka

kafka-clients

0.9.0.0




Thanks
-- 
-Ratha
http://vvratha.blogspot.com/


Re: kafka.consumer.ConsumerTimeoutException

2016-03-23 Thread Ratha v
Forgot to mention, I'm using kafka 2.11 version

On 24 March 2016 at 16:55, Ratha v <vijayara...@gmail.com> wrote:

> Hi all;
>
> I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
> to consume the messages,It continuously throwing timeoutexception..How can
> i get rid of this?
>
> I have multiple topics.
>
>
> *Executor*
>
>
> public class MessageListener {
>
> private Properties properties;
>
>
> private ConsumerConnector consumerConnector;
>
> private String topic;
>
> private ExecutorService executor;
>
>
> public MessageListener(String topic) {
>
> this.topic = topic;
>
>
> KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
>
> try {
>
> properties = confLoader.loadConsumerConfig();
>
> ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>
> consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
>
> } catch (FileNotFoundException e) {
>
> e.printStackTrace();
>
> }
>
> }
>
>
> public void start(RawFile file) {
>
>
> Map<String, Integer> topicCountMap = new HashMap<>();
>
> topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));
>
>
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector
>
> .createMessageStreams(topicCountMap);
>
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
> executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);
>
>
> for (KafkaStream<byte[], byte[]> stream : streams) {
>
> executor.submit(new ListenerThread(stream));
>
>
> }
>
> }
>
>
>
> }
>
>
>
> *Thread*
>
> public class ListenerThread implements Runnable {
>
> private KafkaStream<byte[], byte[]> stream;;
>
>
> public ListenerThread(KafkaStream<byte[], byte[]> msgStream) {
>
> this.stream = msgStream;
>
>
> }
>
>
> @Override
>
> public void run() {
>
> try {
>
>
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
> while (it.hasNext()) {
>
> MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.makeNext();
>
> String topic = messageAndMetadata.topic();
>
> byte[] message = messageAndMetadata.message();
>
> System.out.println("111");
>
> FileProcessor processor = new FileProcessor();
>
> processor.processFile(topic, message);
>
> }
>
> } catch (ConsumerTimeoutException cte) {
>
> System.out.println("Consumer timed out");
>
> }
>
>
>
> Thanks.
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


kafka.consumer.ConsumerTimeoutException

2016-03-23 Thread Ratha v
Hi all;

I'm new to kafka and wrote a simple multithreaded kafka consumer. when try
to consume the messages,It continuously throwing timeoutexception..How can
i get rid of this?

I have multiple topics.


*Executor*


public class MessageListener {

private Properties properties;


private ConsumerConnector consumerConnector;

private String topic;

private ExecutorService executor;


public MessageListener(String topic) {

this.topic = topic;


KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();

try {

properties = confLoader.loadConsumerConfig();

ConsumerConfig consumerConfig = new ConsumerConfig(properties);

consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

} catch (FileNotFoundException e) {

e.printStackTrace();

}

}


public void start(RawFile file) {


Map topicCountMap = new HashMap<>();

topicCountMap.put(topic, new Integer(CoreConstants.THREAD_SIZE));


Map>> consumerMap =
consumerConnector

.createMessageStreams(topicCountMap);

List> streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(CoreConstants.THREAD_SIZE);


for (KafkaStream stream : streams) {

executor.submit(new ListenerThread(stream));


}

}



}



*Thread*

public class ListenerThread implements Runnable {

private KafkaStream stream;;


public ListenerThread(KafkaStream msgStream) {

this.stream = msgStream;


}


@Override

public void run() {

try {


ConsumerIterator it = stream.iterator();

while (it.hasNext()) {

MessageAndMetadata messageAndMetadata = it.makeNext();

String topic = messageAndMetadata.topic();

byte[] message = messageAndMetadata.message();

System.out.println("111");

FileProcessor processor = new FileProcessor();

processor.processFile(topic, message);

}

} catch (ConsumerTimeoutException cte) {

System.out.println("Consumer timed out");

}



Thanks.

-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to publish/consume java bean objects to Kafka 2.11 version?

2016-03-22 Thread Ratha v
Thanks Gerad for the helpful hint..I was looking for samples on how to
publish/consume java objects and could not find any concrete samples. Some
references points old APIs I guess.

Im using Kafka 2.11 version.I have written my own serialiser( I thought I
have to, based on guides ) and don't know which methods I have to use to
publish..

Now I can somehow publish and consume objects..
I'll look into avro schema registry too.

Regards,
Ratha.


On 22 March 2016 at 18:25, Gerard Klijs <gerard.kl...@dizzit.com> wrote:

> If I'm reading right, your question is more about how to successfully
> de(serialise) java object? You might want to take a look at the confluent
> avro schema registry. Using avro schema's you can easily store messages in
> a java object created by the schema. This way the messages will also be a
> lot smaller, witch helps performance. And you don't have to maintain you
> own de(serialiser).
>
> On Tue, Mar 22, 2016 at 3:38 AM Ratha v <vijayara...@gmail.com> wrote:
>
> > Hi all;
> > Im a newbie to kafka. Im trying to publish my java object to kafka topic
> an
> > try to consume.
> > I see there are some API changes in the latest version of the kafka. can
> > anybody point some samples for how to publish and consume java objects? I
> > have written my own data serializer, but could not publish that to a
> topic.
> > Any guide/samples would be appreciate..
> >
> >
> > *Customserilaizer*
> >
> >
> >
> > import java.io.ByteArrayInputStream;
> > import java.io.ByteArrayOutputStream;
> > import java.io.IOException;
> > import java.io.ObjectInput;
> > import java.io.ObjectInputStream;
> > import java.io.ObjectOutput;
> > import java.io.ObjectOutputStream;
> >
> >
> > import kafka.serializer.Decoder;
> > import kafka.serializer.Encoder;
> >
> > public class CustomSerializer implements Encoder,
> > Decoder< FileObj > {
> >
> > @Override
> > public byte[] toBytes(FileObj file) {
> > try {
> >
> > ByteArrayOutputStream bos = new ByteArrayOutputStream();
> > ObjectOutput out = null;
> > byte[] rawFileBytes;
> > try {
> > out = new ObjectOutputStream(bos);
> > out.writeObject(file);
> > rawFileBytes = bos.toByteArray();
> >
> > } finally {
> > try {
> > if (out != null) {
> > out.close();
> > bos.close();
> > }
> > } catch (Exception ex) {
> > ex.getLocalizedMessage();
> > }
> >
> > }
> > return rawFileBytes;
> > } catch (IOException e) {
> > e.printStackTrace();
> > return null;
> > }
> >
> > }
> >
> > @Override
> > public FileObj fromBytes(byte[] fileContent) {
> > ByteArrayInputStream bis = new ByteArrayInputStream(fileContent);
> > ObjectInput in = null;
> > Object obj = null;
> > try {
> > in = new ObjectInputStream(bis);
> > obj = in.readObject();
> >
> > } catch (IOException e) {
> >
> > e.printStackTrace();
> > } catch (ClassNotFoundException e) {
> >
> > e.printStackTrace();
> > } finally {
> > try {
> > bis.close();
> > if (in != null) {
> > in.close();
> > }
> > } catch (IOException ex) {
> > // ignore
> > }
> >
> > }
> > return (FileObj) obj;
> > }
> >
> > }
> >
> >
> >
> > -Ratha
> >
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


How to publish/consume java bean objects to Kafka 2.11 version?

2016-03-21 Thread Ratha v
Hi all;
Im a newbie to kafka. Im trying to publish my java object to kafka topic an
try to consume.
I see there are some API changes in the latest version of the kafka. can
anybody point some samples for how to publish and consume java objects? I
have written my own data serializer, but could not publish that to a topic.
Any guide/samples would be appreciate..


*Customserilaizer*



import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;


import kafka.serializer.Decoder;
import kafka.serializer.Encoder;

public class CustomSerializer implements Encoder,
Decoder< FileObj > {

@Override
public byte[] toBytes(FileObj file) {
try {

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] rawFileBytes;
try {
out = new ObjectOutputStream(bos);
out.writeObject(file);
rawFileBytes = bos.toByteArray();

} finally {
try {
if (out != null) {
out.close();
bos.close();
}
} catch (Exception ex) {
ex.getLocalizedMessage();
}

}
return rawFileBytes;
} catch (IOException e) {
e.printStackTrace();
return null;
}

}

@Override
public FileObj fromBytes(byte[] fileContent) {
ByteArrayInputStream bis = new ByteArrayInputStream(fileContent);
ObjectInput in = null;
Object obj = null;
try {
in = new ObjectInputStream(bis);
obj = in.readObject();

} catch (IOException e) {

e.printStackTrace();
} catch (ClassNotFoundException e) {

e.printStackTrace();
} finally {
try {
bis.close();
if (in != null) {
in.close();
}
} catch (IOException ex) {
// ignore
}

}
return (FileObj) obj;
}

}



-Ratha

http://vvratha.blogspot.com/