Re: Kafka over Satellite links

2016-03-02 Thread Christian Csar
I would not do that. I admit I may be a bit biased due to working for
Buddy Platform (IoT backend stuff including telemetry collection), but
you want to send the data via some protocol (HTTP? MQTT? COAP?) to the
central hub and then have those servers put the data into Kafka. Now
if you want to use Kafka there are the various HTTP front ends that
will basically put the data into Kafka for you without the client
needing to deal with the partition management part. But putting data
into Kafka directly really seems like a bad idea even if it's a large
number of messages per second per node, even if the security parts
work out for you.

Christian

On Wed, Mar 2, 2016 at 9:52 PM, Jan  wrote:
> Hi folks;
> does anyone know of Kafka's ability to work over Satellite links. We have a 
> IoT Telemetry application that uses Satellite communication to send data from 
> remote sites to a Central hub.
> Any help/ input/ links/ gotchas would be much appreciated.
> Regards,Jan


Kafka over Satellite links

2016-03-02 Thread Jan
Hi folks; 
does anyone know of Kafka's ability to work over Satellite links. We have a IoT 
Telemetry application that uses Satellite communication to send data from 
remote sites to a Central hub. 
Any help/ input/ links/ gotchas would be much appreciated. 
Regards,Jan

Re: migrating the main-page docs to gitbook format

2016-03-02 Thread Christian Posta
For sure! Will take a look!

On Wednesday, March 2, 2016, Gwen Shapira  wrote:

> Hey!
>
> Yes! We'd love that too! Maybe you want to help us out with
> https://issues.apache.org/jira/browse/KAFKA-2967 ?
>
> Gwen
>
> On Wed, Mar 2, 2016 at 2:39 PM, Christian Posta
> > wrote:
> > Would love to have the docs in gitbook/markdown format so they can easily
> > be viewed from the source repo (or mirror, technically) on github.com.
> They
> > can also be easily converted to HTML, have a side-navigation ToC, and
> still
> > be versioned along with the src code.
> >
> > Thoughts?
> >
> > --
> > *Christian Posta*
> > twitter: @christianposta
> > http://www.christianposta.com/blog
> > http://fabric8.io
>


-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


Re: About the number of partitions

2016-03-02 Thread BYEONG-GI KIM
Dear James,

Thank you for the information indeed!

That's very helpful for me to understand much more deeply about kafka.

Best regards

Kim

2016-03-03 3:29 GMT+09:00 James Cheng :

> Kim,
>
> Here's a good blog post from Confluent with advice on how to choose the
> number of partitions.
>
>
> http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
>
> -James
>
>
> > On Mar 1, 2016, at 4:11 PM, BYEONG-GI KIM  wrote:
> >
> > Hello.
> >
> > I have questions about how many partitions are optimal while using kafka.
> > As far as I know, even if there are multiple consumers that belong to a
> > consumer group, say *group_A*, only one consumer can receive a kafka
> > message produced by a producer if there is a partition. So, as a result,
> > multiple partitions are required in order to distribute the message to
> all
> > the consumers in group_A if I want the consumers to get the message.
> >
> > Is it right?
> >
> > I'm considering developing several kafka consumer applications, e.g.,
> > message saver, message analyzer, etc., so a message from a producer must
> be
> > consumed by those kinds of consumers.
> >
> > Any advice and help would be really appreciated.
> >
> > Thanks in advance!
> >
> > Best regards
> >
> > Kim
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: About the number of partitions

2016-03-02 Thread BYEONG-GI KIM
Dear Jens

Thank you for the reply!

It's really hard to decide how many brokers/partitions are optimal for a
system. Is there any good reports or documents about that? I'd like to know
some examples related to the optimization, especially on product level
environment.

Thank you in advance.

Best regards

Kim

2016-03-02 21:19 GMT+09:00 Jens Rantil :

> Hi Kim,
>
> You are correct in that the number of partitions sets the upper limit on
> consumer parallelization. That is, a single consumer in a group can consume
> multiple partitions, however multiple consumers in a group can't consume a
> single partition.
>
> Also, since partitions are spread across your brokers, really it's the
> ratio nPartitions/nBrokers that you want to optimize for.
>
> Given the above parallelization limit, it would make sense to have a very
> large ratio. This would have other implications:
>
>- Your brokers will have a lot of smaller files to they will have to
>flush periodically. This can incur a lot of overhead and introduce
>latencies. Especially on a spinning disk where seeks are expensive.
>- Brokers are generally set to rotate their logs at a certain size. It
>could be hard to tune rotation with many small files.
>
> Given this, really you need to benchmark for your use case with your
> message sizes etc.
>
> Side-note: Note that for autoscaling you will have to overprovision your
> partitions somewhat to not hit the parallelization limit.
>
> Cheers,
> Jens
>
> On Wed, Mar 2, 2016 at 1:11 AM, BYEONG-GI KIM  wrote:
>
> > Hello.
> >
> > I have questions about how many partitions are optimal while using kafka.
> > As far as I know, even if there are multiple consumers that belong to a
> > consumer group, say *group_A*, only one consumer can receive a kafka
> > message produced by a producer if there is a partition. So, as a result,
> > multiple partitions are required in order to distribute the message to
> all
> > the consumers in group_A if I want the consumers to get the message.
> >
> > Is it right?
> >
> > I'm considering developing several kafka consumer applications, e.g.,
> > message saver, message analyzer, etc., so a message from a producer must
> be
> > consumed by those kinds of consumers.
> >
> > Any advice and help would be really appreciated.
> >
> > Thanks in advance!
> >
> > Best regards
> >
> > Kim
> >
>
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook  Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter 
>


Re: migrating the main-page docs to gitbook format

2016-03-02 Thread Gwen Shapira
Hey!

Yes! We'd love that too! Maybe you want to help us out with
https://issues.apache.org/jira/browse/KAFKA-2967 ?

Gwen

On Wed, Mar 2, 2016 at 2:39 PM, Christian Posta
 wrote:
> Would love to have the docs in gitbook/markdown format so they can easily
> be viewed from the source repo (or mirror, technically) on github.com. They
> can also be easily converted to HTML, have a side-navigation ToC, and still
> be versioned along with the src code.
>
> Thoughts?
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io


migrating the main-page docs to gitbook format

2016-03-02 Thread Christian Posta
Would love to have the docs in gitbook/markdown format so they can easily
be viewed from the source repo (or mirror, technically) on github.com. They
can also be easily converted to HTML, have a side-navigation ToC, and still
be versioned along with the src code.

Thoughts?

-- 
*Christian Posta*
twitter: @christianposta
http://www.christianposta.com/blog
http://fabric8.io


RE: producer SSL issue - fails with ssl.trustore location/password is not valid

2016-03-02 Thread Martin Gainty



> From: aravindan.ramachand...@gmail.com
> Date: Wed, 2 Mar 2016 12:28:41 -0800
> Subject: producer SSL issue - fails with ssl.trustore location/password is 
> not valid
> To: users@kafka.apache.org
> 
> INFO Registered broker 10 at path /brokers/ids/10 with addresses: PLAINTEXT
> -> EndPoint(kafka-ap-server101.com,9094,PLAINTEXT),SSL -> EndPoint(
> kafka-ap-server101.com,9095,SSL) (kafka.utils.ZkUtils)
> 
> -bash-4.1$ bin/kafka-console-producer.sh --broker-list
> kafka-ap-server101.com:9095 --topic topic123 --producer.config
> config/producer.properties --security-protocol SSL
> [2016-03-02 20:08:11,205] WARN Property ssl.keystore.location is not valid
> (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,205] WARN Property ssl.enabled.protocols is not valid
> (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,205] WARN Property ssl.key.password is not valid
> (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,206] WARN Property ssl.keystore.password is not valid
> (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,206] WARN Property ssl.keystore.type is not valid
> (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,206] WARN Property ssl.truststore.location is not
> valid (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,206] WARN Property ssl.truststore.password is not
> valid (kafka.utils.VerifiableProperties)
> [2016-03-02 20:08:11,206] WARN Property ssl.truststore.type is not valid
> (kafka.utils.VerifiableProperties)

MG>apparently you have this property set: ssl.want.client.auth=required
MG>nobody from this group has those details as each security configuration is 
different
MG>when and if your security admin can provide those details here we could help 
you reconfigure security details for kafka producer
> [2016-03-02 20:08:28,490] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(topic123)] from broker [BrokerEndPoint(0,
> kafka-ap-server101.com,9095)] failed (kafka.client.ClientUtils$)
> java.io.EOFException
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
> at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:121)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2016-03-02 20:08:28,493] ERROR fetching topic metadata for topics
> [Set(topic123)] from broker [ArrayBuffer(BrokerEndPoint(0,
> kafka-ap-server101.com,9095))] failed (kafka.utils.CoreUtils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(topic123)] from broker [ArrayBuffer(BrokerEndPoint(0,
> kafka-ap-server101.com,9095))] failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: 

producer SSL issue - fails with ssl.trustore location/password is not valid

2016-03-02 Thread aravindan ramachandran
INFO Registered broker 10 at path /brokers/ids/10 with addresses: PLAINTEXT
-> EndPoint(kafka-ap-server101.com,9094,PLAINTEXT),SSL -> EndPoint(
kafka-ap-server101.com,9095,SSL) (kafka.utils.ZkUtils)

-bash-4.1$ bin/kafka-console-producer.sh --broker-list
kafka-ap-server101.com:9095 --topic topic123 --producer.config
config/producer.properties --security-protocol SSL
[2016-03-02 20:08:11,205] WARN Property sl.keystore.location is not valid
(kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,205] WARN Property ssl.enabled.protocols is not valid
(kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,205] WARN Property ssl.key.password is not valid
(kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,206] WARN Property ssl.keystore.password is not valid
(kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,206] WARN Property ssl.keystore.type is not valid
(kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,206] WARN Property ssl.truststore.location is not
valid (kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,206] WARN Property ssl.truststore.password is not
valid (kafka.utils.VerifiableProperties)
[2016-03-02 20:08:11,206] WARN Property ssl.truststore.type is not valid
(kafka.utils.VerifiableProperties)
a
[2016-03-02 20:08:28,490] WARN Fetching topic metadata with correlation id
0 for topics [Set(topic123)] from broker [BrokerEndPoint(0,
kafka-ap-server101.com,9095)] failed (kafka.client.ClientUtils$)
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
at kafka.producer.SyncProducer.send(SyncProducer.scala:121)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-03-02 20:08:28,493] ERROR fetching topic metadata for topics
[Set(topic123)] from broker [ArrayBuffer(BrokerEndPoint(0,
kafka-ap-server101.com,9095))] failed (kafka.utils.CoreUtils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(topic123)] from broker [ArrayBuffer(BrokerEndPoint(0,
kafka-ap-server101.com,9095))] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:79)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
at kafka.producer.SyncProducer.send(SyncProducer.scala:121)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
... 12 more
[2016-03-02 20:08:28,656] WARN Fetching topic metadata with correlation id
1 for topics 

Announcing rdkafka-dotnet - C# Apache Kafka client

2016-03-02 Thread Andreas Heider
Hi,

I was really missing a high-quality Kafka client for C#/F#, so I built one: 
https://github.com/ah-/rdkafka-dotnet
It’s based on the fantastic librdkafka, so it supports pretty much everything 
you might want:

- High performance (I'm getting ~1 million msgs/second producing/consuming on 
my laptop with a single process)
- An API close to the new Java Consumer / Producer
- Kafka 0.9 consumer groups and broker offset storage
- Working failover
- Auto-committing of offsets
- Compression with snappy and gzip
- Metadata API to query for topics and offsets
- Custom message partitioners
- No zookeeper dependency

It runs on .NET Core on Linux, OS X and Windows, and classic .NET 4.5 on 
Windows. Mono should work, but I haven’t tested it outside dnx.

Cheers,
Andreas

Re: Large Size Error even when the message is small

2016-03-02 Thread Fang Wong
Also the key serializer is
 org.apache.kafka.common.serialization.StringSerializer and value
serializer = org.apache.kafka.common.serialization.ByteArraySerializer.

On Wed, Mar 2, 2016 at 10:24 AM, Fang Wong  wrote:

> try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
>
> try (DataOutputStream dos = new DataOutputStream(bos)) {
>
> // First element is the timestamp
>
>  dos.writeLong(System.currentTimeMillis());
>
> // Second element is the class name, this element is used for
> deserializing the event at the consumer side
>
> dos.writeUTF(this.appLevelTopicId);
>
> //Third Element is subTopic
>
> dos.writeUTF(getSubTopic() ==null? "null" : getSubTopic());
>
> // Fourth element is the class name, this element is used for
> deserializing the event at the consumer side
>
> dos.writeUTF(event.getClass().getName());
>
> }
>
> try (OutputStreamWriter byteWriter = new OutputStreamWriter(bos,
> com.force.commons.text.EncodingUtil.UTF_ENCODING)) {
>
> Optional scrtContext = scrtContextProvider.getOptional();
>
>   if (scrtContext.isPresent()) {
>
>   serializationService.toJson(scrtContext.get(), byteWriter);
>
>   } else {
>
>   byteWriter.write("null");
>
>   }
>
>   // Sixth element is the actual event
>
>   serializationService.toJson(event, byteWriter);
>
>   // Seventh element is the request context
>
>   if (RequestContext.get().isEstablished()) {
>
>   serializationService.toJson(RequestContext.serialize(),
> byteWriter);
>
>   }
>
> }
>
> byte[] payload = bos.toByteArray();
>
> ProducerRecord data = new ProducerRecord []>(kafkaTopicAttributes.getTopicName(),partitionNumber, null, payload);
>
>  //Send is async by default and when the messages are published to
> the Kafka broker, Callback is executed with the status of the delivery
>
>
> kafkaProducer.send(data);
>
>
>
> On Wed, Mar 2, 2016 at 4:59 AM, Asaf Mesika  wrote:
>
>> Can you show your code for sending?
>>
>> On Tue, 1 Mar 2016 at 21:59 Fang Wong  wrote:
>>
>> > [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, [2016-02-26
>> > 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /
>> 10.224.146.62
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /
>> 10.224.146.63
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /
>> 10.224.146.61
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /
>> 10.224.146.60
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> (kafka.network.Processor)
>> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /
>> 10.224.146.59
>> > due to invalid request: Request of length 1937006964 is not valid, it is
>> > larger than the maximum size of 104857600 bytes.
>> > (kafka.network.Processor)of
>> > 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
>> > invalid request: Request of length 1937006964 is not valid, it is larger
>> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
>> >
>>
>
>


RE: 0.9.0.1 Kafka Java client Producer hangs while requesting metadata

2016-03-02 Thread Muthukumaran K
With metadata.fetch.timeout.ms=1, Producer sending goes through. Not sure if 
this is going to affect anything else. 
Now trying to figure-out why KafkaConsumer.poll(1) never returns. 

Regards
Muthu


-Original Message-
From: Muthukumaran K [mailto:muthukumara...@ericsson.com] 
Sent: Wednesday, March 02, 2016 11:28 PM
To: users@kafka.apache.org
Subject: 0.9.0.1 Kafka Java client Producer hangs while requesting metadata

Hi,

Trying a very simple Producer with following code. "producer.send" hangs 
indefinitely. Attaching thread-dump snippet so that I can get some advice if 
something is wrong with my code or configuration.

Kafka runs in a VM and producer runs in host - it's a single-broker setup for 
basic testing. Kafka version used is is 0.9.0.1


Should I set - metadata.fetch.timeout.ms=1 to get over this ? Any side-effects ?



public static void main(String args[]) throws InterruptedException, 
ExecutionException {


Properties props = new Properties();
props.put("bootstrap.servers", 
":9092");
//props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);

System.out.println("Connected ...");

for(int i = 0; i < 100; i++){
producer.send(new ProducerRecord("consumer-tutorial", Integer.toString(i), Integer.toString(i)));
System.out.println("Sent " + i);
}

producer.close();
}


Thread Dump snippet :

"main" prio=6 tid=0x02238000 nid=0x1390 in Object.wait() 
[0x025bf000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0007aecacea0> (a 
org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
- locked <0x0007aecacea0> (a org.apache.kafka.clients.Metadata)
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)

Additional checks and info on my setup

1)  bin/kafka-topics.sh --list --zookeeper :2181  - this clearly shows 
that the topic "consumer-tutorial" is present
2)  to check if there are issues with kafka itself, I locally (within VM where 
Kafka and ZK runs) checked with kafka-console-producer.sh and 
kafka-console-consumer.sh with a different topic and could verify that there is 
no issue with Kafka setup. It worked fine
3) I am using following imports

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;



Regards
Muthu



Re: About the number of partitions

2016-03-02 Thread James Cheng
Kim,

Here's a good blog post from Confluent with advice on how to choose the number 
of partitions.

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

-James


> On Mar 1, 2016, at 4:11 PM, BYEONG-GI KIM  wrote:
>
> Hello.
>
> I have questions about how many partitions are optimal while using kafka.
> As far as I know, even if there are multiple consumers that belong to a
> consumer group, say *group_A*, only one consumer can receive a kafka
> message produced by a producer if there is a partition. So, as a result,
> multiple partitions are required in order to distribute the message to all
> the consumers in group_A if I want the consumers to get the message.
>
> Is it right?
>
> I'm considering developing several kafka consumer applications, e.g.,
> message saver, message analyzer, etc., so a message from a producer must be
> consumed by those kinds of consumers.
>
> Any advice and help would be really appreciated.
>
> Thanks in advance!
>
> Best regards
>
> Kim




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: due to invalid request: Request of length

2016-03-02 Thread Fang Wong
Thanks Anirudh!
I didn't send a large message, so most likely an encoding issue, how to fix
the encoding issue?

I googled, found one link: https://github.com/st23am/ExKafka/issues/4
But we are using Java, I couldn't see how to do this line:
IO.iodata_to_binary(iodata)

Thanks,
Fang

On Tue, Mar 1, 2016 at 5:40 PM, Anirudh P  wrote:

> Hello,
>
> What is the size of the message you are sending. If it is 1937006964 then
> you should set message.max.bytes and replica.fetch.max.bytes to values
> larger than the size of the message you are trying to send.
>
> If it isnt, then its most likely an encoding issue on your side. The kafka
> message is size delimited.
>
> Make sure you are not sending a partial request(or a request with an
> invalid size). The server might wrongly calculate the size of the next
> request and you could see such an error.
>
> Thank you,
> Anirudh
>
> On Wed, Mar 2, 2016 at 6:18 AM, Fang Wong  wrote:
>
> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> >
>


Re: Large Size Error even when the message is small

2016-03-02 Thread Fang Wong
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {

try (DataOutputStream dos = new DataOutputStream(bos)) {

// First element is the timestamp

 dos.writeLong(System.currentTimeMillis());

// Second element is the class name, this element is used for
deserializing the event at the consumer side

dos.writeUTF(this.appLevelTopicId);

//Third Element is subTopic

dos.writeUTF(getSubTopic() ==null? "null" : getSubTopic());

// Fourth element is the class name, this element is used for
deserializing the event at the consumer side

dos.writeUTF(event.getClass().getName());

}

try (OutputStreamWriter byteWriter = new OutputStreamWriter(bos,
com.force.commons.text.EncodingUtil.UTF_ENCODING)) {

Optional scrtContext = scrtContextProvider.getOptional();

  if (scrtContext.isPresent()) {

  serializationService.toJson(scrtContext.get(), byteWriter);

  } else {

  byteWriter.write("null");

  }

  // Sixth element is the actual event

  serializationService.toJson(event, byteWriter);

  // Seventh element is the request context

  if (RequestContext.get().isEstablished()) {

  serializationService.toJson(RequestContext.serialize(), byteWriter
);

  }

}

byte[] payload = bos.toByteArray();

ProducerRecord data = new ProducerRecord(kafkaTopicAttributes.getTopicName(),partitionNumber, null, payload);

 //Send is async by default and when the messages are published to the
Kafka broker, Callback is executed with the status of the delivery

kafkaProducer.send(data);



On Wed, Mar 2, 2016 at 4:59 AM, Asaf Mesika  wrote:

> Can you show your code for sending?
>
> On Tue, 1 Mar 2016 at 21:59 Fang Wong  wrote:
>
> > [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, [2016-02-26
> > 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /
> 10.224.146.62
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /
> 10.224.146.63
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /
> 10.224.146.61
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /
> 10.224.146.60
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)
> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /
> 10.224.146.59
> > due to invalid request: Request of length 1937006964 is not valid, it is
> > larger than the maximum size of 104857600 bytes.
> > (kafka.network.Processor)of
> > 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> > [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
> > invalid request: Request of length 1937006964 is not valid, it is larger
> > than the maximum size of 104857600 bytes. (kafka.network.Processor)
> >
>


0.9.0.1 Kafka Java client Producer hangs while requesting metadata

2016-03-02 Thread Muthukumaran K
Hi,

Trying a very simple Producer with following code. "producer.send" hangs 
indefinitely. Attaching thread-dump snippet so that I can get some advice if 
something is wrong with my code or configuration.

Kafka runs in a VM and producer runs in host - it's a single-broker setup for 
basic testing. Kafka version used is is 0.9.0.1


Should I set - metadata.fetch.timeout.ms=1 to get over this ? Any side-effects ?



public static void main(String args[]) throws InterruptedException, 
ExecutionException {


Properties props = new Properties();
props.put("bootstrap.servers", 
":9092");
//props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);

System.out.println("Connected ...");

for(int i = 0; i < 100; i++){
producer.send(new ProducerRecord("consumer-tutorial", Integer.toString(i), Integer.toString(i)));
System.out.println("Sent " + i);
}

producer.close();
}


Thread Dump snippet :

"main" prio=6 tid=0x02238000 nid=0x1390 in Object.wait() 
[0x025bf000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x0007aecacea0> (a 
org.apache.kafka.clients.Metadata)
at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
- locked <0x0007aecacea0> (a org.apache.kafka.clients.Metadata)
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)

Additional checks and info on my setup

1)  bin/kafka-topics.sh --list --zookeeper :2181  - this clearly shows 
that the topic "consumer-tutorial" is present
2)  to check if there are issues with kafka itself, I locally (within VM where 
Kafka and ZK runs) checked with kafka-console-producer.sh and 
kafka-console-consumer.sh with a different topic and could verify that there is 
no issue with Kafka setup. It worked fine
3) I am using following imports

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;



Regards
Muthu



Re: Unit tests of Kafka application

2016-03-02 Thread craig w
Kafka includes

http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/MockConsumer.html
http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/MockProducer.html

On Wed, Mar 2, 2016 at 12:50 PM, Madhire, Naveen <
naveen.madh...@capitalone.com> wrote:

> Hi,
>
> I want to write unit test cases for testing kafka application. Is there
> any specific kafka-junit or something which creates dummy producers and
> consumers to use in unit tests?
>
> Thanks.
> 
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links


Re: session.timeout.ms limit

2016-03-02 Thread Jay Kreps
Hey Gligor,

Sorry for the rough edges. I think there are a variety of rough edges in
error messages here we can improve:

   1. "Error ILLEGAL_GENERATION occurred while committing offsets for group
   MetadataConsumerSpout" is obviously NOT the most intuitive error message,
   it doesn't really explain what happened or what to do to fix it. We should
   improve that.
   2. The error when you exceed your max session timeout doesn't tell you
   that there is a config on the broker that controls this. There isn't an
   easy way to figure this out on your own.
   3. The default max on the broker should probably be higher?
   4. The docs on group.max.session.timeout.ms should explain why this
   exists, why you'd want to change it, etc.
   5. We are working on a way to control the maximum number of messages per
   poll request which will help reduce the session timeout you need, but I
   think that is orthogonal to the confusion.

Thanks for walking through the steps you had to go through in figuring it
out, that will help us to round off some of the corners.

-Jay

On Wed, Mar 2, 2016 at 6:09 AM, Gligor Vanessa 
wrote:

> Hello,
>
> I am using Kafka higher consumer 0.9.0. I am not using the auto commit for
> the offsets, so after I consume the messaged (poll from Kafka) I will have
> to commit the offsets manually.
>
> The issue that I have is actually that the processing of the messages takes
> longer than 30s (and I cannot call poll again, before these messages are
> processed) and when I try to commit the offset a exception is thrown:
> ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred
> while committing offsets for group MetadataConsumerSpout.
> (I have found on stackoverflow this explanation: so if you wait for longer
> that the timeout request then the coordinator for the topic will kickout
> the consumer because it will think is dead and it will rebalance the group)
>
> In order to get rid of this I have thought about a couple of solutions:
>
> 1. The configuration session.timeout.ms has a maximum value, so if I try
> to
> set it to 60 seconds, also I get an exception, because this value is not in
> the valid interval.
>
> 2. I have tried to find a solution to get a paginated request when the
> polling method is called - no success.
>
> 3. I have tried to send a heart beat from the outside of the poll (because
> this method sends the heartbeats) - no success.
>
>
> Thank you.
>
> Vanessa.
>


Unit tests of Kafka application

2016-03-02 Thread Madhire, Naveen
Hi,

I want to write unit test cases for testing kafka application. Is there any 
specific kafka-junit or something which creates dummy producers and consumers 
to use in unit tests?

Thanks.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: session.timeout.ms limit

2016-03-02 Thread Martin Skøtt
Hi,
Regarding item 1, the maximum can be configured in the broker by changing
group.max.session.timeout.ms.

Regards,
Martin

On 2 March 2016 at 15:09, Gligor Vanessa  wrote:

> Hello,
>
> I am using Kafka higher consumer 0.9.0. I am not using the auto commit for
> the offsets, so after I consume the messaged (poll from Kafka) I will have
> to commit the offsets manually.
>
> The issue that I have is actually that the processing of the messages takes
> longer than 30s (and I cannot call poll again, before these messages are
> processed) and when I try to commit the offset a exception is thrown:
> ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred
> while committing offsets for group MetadataConsumerSpout.
> (I have found on stackoverflow this explanation: so if you wait for longer
> that the timeout request then the coordinator for the topic will kickout
> the consumer because it will think is dead and it will rebalance the group)
>
> In order to get rid of this I have thought about a couple of solutions:
>
> 1. The configuration session.timeout.ms has a maximum value, so if I try
> to
> set it to 60 seconds, also I get an exception, because this value is not in
> the valid interval.
>
> 2. I have tried to find a solution to get a paginated request when the
> polling method is called - no success.
>
> 3. I have tried to send a heart beat from the outside of the poll (because
> this method sends the heartbeats) - no success.
>
>
> Thank you.
>
> Vanessa.
>


Seek to invalid offset, new consumer

2016-03-02 Thread Giidox
Hi all!

I am using the new consumer API by manually assigning partitions. I’m having 
some trouble with seek.

When I seek with a valid offset, poll works ok. However, if I call seek with an 
offset that is so small that the broker no longer has that offset, poll returns 
no records. Is there a way to get a callback or exception for seeking to an 
invalid offset? Is there a way to discover what the valid offset range is, with 
the new consumer?

Is the old consumer API deprecated (or is it planned to be deprecated)?

Any help is much appreciated.

- Giidox



session.timeout.ms limit

2016-03-02 Thread Gligor Vanessa
Hello,

I am using Kafka higher consumer 0.9.0. I am not using the auto commit for
the offsets, so after I consume the messaged (poll from Kafka) I will have
to commit the offsets manually.

The issue that I have is actually that the processing of the messages takes
longer than 30s (and I cannot call poll again, before these messages are
processed) and when I try to commit the offset a exception is thrown:
ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred
while committing offsets for group MetadataConsumerSpout.
(I have found on stackoverflow this explanation: so if you wait for longer
that the timeout request then the coordinator for the topic will kickout
the consumer because it will think is dead and it will rebalance the group)

In order to get rid of this I have thought about a couple of solutions:

1. The configuration session.timeout.ms has a maximum value, so if I try to
set it to 60 seconds, also I get an exception, because this value is not in
the valid interval.

2. I have tried to find a solution to get a paginated request when the
polling method is called - no success.

3. I have tried to send a heart beat from the outside of the poll (because
this method sends the heartbeats) - no success.


Thank you.

Vanessa.


Re: Consumers doesn't always poll first messages

2016-03-02 Thread Jan Omar
Hi Robin,

Why would you expect it to start from the first message?

You're comitting the read offsets automatically every second. The offset is 
persisted, next time you consume again, it will start at the persisted offset 
again.

 consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

Regards

Jan

> On 2 Mar 2016, at 15:14, Péricé Robin  wrote:
> 
> Hello everybody,
> 
> I'm testing the new 0.9.0.1 API and I try to make a basic example working.
> 
> *Java code* :
> 
> *Consumer *: http://pastebin.com/YtvW0sz5
> *Producer *: http://pastebin.com/anQay9YE
> *Test* : http://pastebin.com/nniYLsHL
> 
> 
> *Kafka configuration* :
> 
> *Zookeeper propertie*s : http://pastebin.com/KC5yZdNx
> *Kafka properties* : http://pastebin.com/Psy4uAYL
> 
> But when I try to run my test and restart Kafka to see what happen. The
> Consumer doesn't always consume first messages. Sometimes it consume
> messages at offset 0 or 574 or 1292 ... The behavior of the test seems to
> be very random.
> 
> Anybody have an idea on that issue ?
> 
> Best Regards,
> 
> Robin



Re: session.timeout.ms limit - Kafka Consumer

2016-03-02 Thread Olson,Andrew
This topic is currently being discussed at 
https://issues.apache.org/jira/browse/KAFKA-2986 and 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records


On 3/2/16, 8:11 AM, "Vanessa Gligor"  wrote:

>Hello,
>
>I am using Kafka higher consumer 0.9.0. I am not using the auto commit for
>the offsets, so after I consume the messaged (poll from Kafka) I will have
>to commit the offsets manually.
>
>The issue that I have is actually that the processing of the messages takes
>longer than 30s (and I cannot call poll again, before these messages are
>processed) and when I try to commit the offset a exception is thrown:
>ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred
>while committing offsets for group MetadataConsumerSpout.
>(I have found on stackoverflow this explanation: so if you wait for longer
>that the timeout request then the coordinator for the topic will kickout
>the consumer because it will think is dead and it will rebalance the group)
>
>In order to get rid of this I have thought about a couple of solutions:
>
>1. The configuration session.timeout.ms has a maximum value, so if I try to
>set it to 60 seconds, also I get an exception, because this value is not in
>the valid interval.
>
>2. I have tried to find a solution to get a paginated request when the
>polling method is called - no success.
>
>3. I have tried to send a heart beat from the outside of the poll (because
>this method sends the heartbeats) - no success.
>
>
>Thank you.

CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Consumers doesn't always poll first messages

2016-03-02 Thread Péricé Robin
Hello everybody,

I'm testing the new 0.9.0.1 API and I try to make a basic example working.

*Java code* :

*Consumer *: http://pastebin.com/YtvW0sz5
*Producer *: http://pastebin.com/anQay9YE
*Test* : http://pastebin.com/nniYLsHL


*Kafka configuration* :

*Zookeeper propertie*s : http://pastebin.com/KC5yZdNx
*Kafka properties* : http://pastebin.com/Psy4uAYL

But when I try to run my test and restart Kafka to see what happen. The
Consumer doesn't always consume first messages. Sometimes it consume
messages at offset 0 or 574 or 1292 ... The behavior of the test seems to
be very random.

Anybody have an idea on that issue ?

Best Regards,

Robin


session.timeout.ms limit - Kafka Consumer

2016-03-02 Thread Vanessa Gligor
Hello,

I am using Kafka higher consumer 0.9.0. I am not using the auto commit for
the offsets, so after I consume the messaged (poll from Kafka) I will have
to commit the offsets manually.

The issue that I have is actually that the processing of the messages takes
longer than 30s (and I cannot call poll again, before these messages are
processed) and when I try to commit the offset a exception is thrown:
ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred
while committing offsets for group MetadataConsumerSpout.
(I have found on stackoverflow this explanation: so if you wait for longer
that the timeout request then the coordinator for the topic will kickout
the consumer because it will think is dead and it will rebalance the group)

In order to get rid of this I have thought about a couple of solutions:

1. The configuration session.timeout.ms has a maximum value, so if I try to
set it to 60 seconds, also I get an exception, because this value is not in
the valid interval.

2. I have tried to find a solution to get a paginated request when the
polling method is called - no success.

3. I have tried to send a heart beat from the outside of the poll (because
this method sends the heartbeats) - no success.


Thank you.


Kafka 0.9.0.1 new consumer - when no longer considered beta?

2016-03-02 Thread Sean Morris (semorris)
What are the plans for removing the "beta" label from the new consumer APIs? 
Will that be in another 0.9.0.x release? I assume since it has the "beta" label 
it should not be used in a production environment.

Thanks.


Re: Large Size Error even when the message is small

2016-03-02 Thread Asaf Mesika
Can you show your code for sending?

On Tue, 1 Mar 2016 at 21:59 Fang Wong  wrote:

> [2016-02-26 20:33:42,997] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, [2016-02-26
> 20:33:42,997] INFO Closing socket connection to /10.224.146.58 due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,025] INFO Closing socket connection to /10.224.146.62
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,047] INFO Closing socket connection to /10.224.146.63
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,049] INFO Closing socket connection to /10.224.146.61
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,055] INFO Closing socket connection to /10.224.146.60
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,112] INFO Closing socket connection to /10.224.146.59
> due to invalid request: Request of length 1937006964 is not valid, it is
> larger than the maximum size of 104857600 bytes.
> (kafka.network.Processor)of
> 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,025] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,047] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,049] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,055] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
> [2016-02-26 20:33:43,112] INFO Closing socket connection to /x due to
> invalid request: Request of length 1937006964 is not valid, it is larger
> than the maximum size of 104857600 bytes. (kafka.network.Processor)
>


Re: About the number of partitions

2016-03-02 Thread Jens Rantil
Hi Kim,

You are correct in that the number of partitions sets the upper limit on
consumer parallelization. That is, a single consumer in a group can consume
multiple partitions, however multiple consumers in a group can't consume a
single partition.

Also, since partitions are spread across your brokers, really it's the
ratio nPartitions/nBrokers that you want to optimize for.

Given the above parallelization limit, it would make sense to have a very
large ratio. This would have other implications:

   - Your brokers will have a lot of smaller files to they will have to
   flush periodically. This can incur a lot of overhead and introduce
   latencies. Especially on a spinning disk where seeks are expensive.
   - Brokers are generally set to rotate their logs at a certain size. It
   could be hard to tune rotation with many small files.

Given this, really you need to benchmark for your use case with your
message sizes etc.

Side-note: Note that for autoscaling you will have to overprovision your
partitions somewhat to not hit the parallelization limit.

Cheers,
Jens

On Wed, Mar 2, 2016 at 1:11 AM, BYEONG-GI KIM  wrote:

> Hello.
>
> I have questions about how many partitions are optimal while using kafka.
> As far as I know, even if there are multiple consumers that belong to a
> consumer group, say *group_A*, only one consumer can receive a kafka
> message produced by a producer if there is a partition. So, as a result,
> multiple partitions are required in order to distribute the message to all
> the consumers in group_A if I want the consumers to get the message.
>
> Is it right?
>
> I'm considering developing several kafka consumer applications, e.g.,
> message saver, message analyzer, etc., so a message from a producer must be
> consumed by those kinds of consumers.
>
> Any advice and help would be really appreciated.
>
> Thanks in advance!
>
> Best regards
>
> Kim
>



-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Greets To team with a issue .

2016-03-02 Thread Arvind Sharma
Hi Team ,

First of all my warm regards to each and everyone in the community .

We are using kafka for centralizing the logging through our php application
.

Earlier our application was writing all the logs in a file in json line
format , which we used to read using filebeat ( ELK family member ) and
send it to logstash instace which in turn send it to kafka topics using
kafka output plugin .

Recently we enhanced our logger and made a producer which logs directly to
kafka server using a kafka handler ( This handler uses phpkafka ( on the
top of librdkafka) .

We ran into situation where if broker was down the php application was not
able to produce any more logs throwing error .

The error which We recieved at consumer end was

[2016-03-02 15:25:18,233] ERROR
[ConsumerFetcherThread-console-consumer-14821_zk1-1456912451600-6bccbb09-0-1],
Error for partition [cron,1] to broker 1:class
kafka.common.NotLeaderForPartitionException
(kafka.consumer.ConsumerFetcherThread)


We have 3 zookeeper servers and 4 kafka brokers .

I have topic with 5 partition and 3 replicas each .

Now if 1 of the broker gets down , application should automatically detect
leader and start writing in the available leader .

But application ends with segementation fault .

Please help .

Regards
Arvind Sharma
Software Engg.
Shopclues


Consumer Offsets Topic cleanup.policy

2016-03-02 Thread Achanta Vamsi Subhash
Hi all,

We have a __consumer_offsets topic has cleanup.policy=compact and
log.cleaner.enable=false. What would happen if we change the cleanup.policy
to delete? Will that treat the offsets topic as same as any other topic?

We currently have a setup without log.cleaner.enable=false and we have
offset topics hosting brokers using a lot of disk as they are never
cleaned/compacted. We tried enabling log.cleaner.enable=true for the
brokers with offsets topic and that is leading to lot of replicated data
and is taking hours to finish.

What is a better way to clean up the old segments of __consumer_offsets
topic?

-- 
Regards
Vamsi Subhash


does leader partition block ? 0.8.2

2016-03-02 Thread Anishek Agarwal
Hello,

We have 4 topics deployed on 4 node kafka cluster. For one of the topic we
are trying to read data from beginning,  using the kafka high level
consumer.

the topic has 32 partitions and we create 32 streams using high level
consumer so that one partition per stream is used, we then have 32 threads
each working with one stream that we create.

we have set the "consumer.timeout.ms" to 1(10 sec ). We are seeing
issues where most of threads end prematurely by not reading the entire
partition and timing out.

The way i have interpreted "consumer.timeout.ms" -- the consumer will wait
for the configured milliseconds once it reaches the end of a partition for
a new message before it throws ConsumertimeoutException. is this correct ?

if yes then the only reason i see the exceptions before we reach the end of
partition is if the leader for that partition is blocked doing something
else and hence is not serving the request to read. is this even possible ?
given kafka is such high throughput system.

Thanks in advance for helping !
anishek