Kafka cluster test cases

2016-06-30 Thread Ghosh, Prabal Kumar
Hi Everyone,

Is there any test suite available for testing cluster health and some 
functional tests.

Regards,
Prabal K Ghosh


Heartbeating during long processing times

2016-06-30 Thread Elias Levy
What is the officially recommended method to heartbeat using the new Java
consumer during long message processing times?

I thought I could accomplish this by setting max.poll.records to 1 in the
client, calling consumer.pause(consumer.assignment()) when starting to
process a record, calling consumer.resume(consumer.paused()) when done
processing a record and committing its offset, and calling consumer.poll(0)
intermittently while processing the record.

The testing shows that consumer.poll(0) will return records, rather than
returning nil or an empty ConsumerRecords.


Re: broker randomly shuts down

2016-06-30 Thread allen chan
Hi Shikhar,
I do not see stderr log file anywhere. Can you point me to where kafka
would write such a file?

On Thu, Jun 30, 2016 at 5:10 PM, Shikhar Bhushan 
wrote:

> Perhaps it's a JVM crash? You might not see anything in the standard
> application-level logs, you'd need to look for the stderr.
>
> On Thu, Jun 30, 2016 at 5:07 PM allen chan 
> wrote:
>
> > Anyone else have ideas?
> >
> > This is still happening. I moved off zookeeper from the server to its own
> > dedicated VMs.
> > Kakfa starts with 4G of heap and gets nowhere near that much consumed
> when
> > it crashed.
> > i bumped up the zookeeper timeout settings but that has not solved it.
> >
> > I also disconnected all the producers and consumers. This point something
> > between kafka and zookeeper right?
> >
> > Again logs are no help as to why kafka decided to shut itself down
> > https://gist.github.com/allenmchan/f9331e54bb4fd77cc5bc0b031a7a6206
> >
> >
> >
> >
> > On Thu, Jun 2, 2016 at 4:22 PM, Russ Lavoie 
> wrote:
> >
> > > What about in dmesg?  I have run into this issue and it was the OOM
> > > killer.  I also ran into a heap issue using too much of the direct
> memory
> > > (JVM).  Reducing the fetcher threads helped with that problem.
> > > On Jun 2, 2016 12:19 PM, "allen chan" 
> > > wrote:
> > >
> > > > Hi Tom,
> > > >
> > > > That is one of the first things that i checked. Active memory never
> > goes
> > > > above 50% of overall available. File cache uses the rest of the
> memory
> > > but
> > > > i do not think that causes OOM killer.
> > > > Either way there is no entries in /var/log/messages (centos) to show
> > OOM
> > > is
> > > > happening.
> > > >
> > > > Thanks
> > > >
> > > > On Thu, Jun 2, 2016 at 5:36 AM, Tom Crayford 
> > > wrote:
> > > >
> > > > > That looks like somebody is killing the process. I'd suspect either
> > the
> > > > > linux OOM killer or something else automatically killing the JVM
> for
> > > some
> > > > > reason.
> > > > >
> > > > > For the OOM killer, assuming you're on ubuntu, it's pretty easy to
> > find
> > > > in
> > > > > /var/log/syslog (depending on your setup). I don't know about other
> > > > > operating systems.
> > > > >
> > > > > On Thu, Jun 2, 2016 at 5:54 AM, allen chan <
> > > allen.michael.c...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > I have an issue where my brokers would randomly shut itself down.
> > > > > > I turned on debug in log4j.properties but still do not see a
> reason
> > > why
> > > > > the
> > > > > > shutdown is happening.
> > > > > >
> > > > > > Anyone seen this behavior before?
> > > > > >
> > > > > > version 0.10.0
> > > > > > log4j.properties
> > > > > > log4j.rootLogger=DEBUG, kafkaAppender
> > > > > > * I tried TRACE level but i do not see any additional log
> messages
> > > > > >
> > > > > > snippet of log around shutdown
> > > > > > [2016-06-01 15:11:51,374] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:11:53,376] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:11:55,377] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:11:57,380] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:11:59,383] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:01,386] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:03,389] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker
> > 2]:
> > > > > > Removed 0 expired offsets in 0 milliseconds.
> > > > > > (kafka.coordinator.GroupMetadataManager)
> > > > > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker
> > 2]:
> > > > > > Removed 0 expired offsets in 0 milliseconds.
> > > > > > (kafka.coordinator.GroupMetadataManager)
> > > > > > [2016-06-01 15:12:05,390] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:07,393] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:09,396] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:11,399] DEBUG Got ping response for sessionid:
> > > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > > > > (kafka.server.Kafka

Re: broker randomly shuts down

2016-06-30 Thread Shikhar Bhushan
Perhaps it's a JVM crash? You might not see anything in the standard
application-level logs, you'd need to look for the stderr.

On Thu, Jun 30, 2016 at 5:07 PM allen chan 
wrote:

> Anyone else have ideas?
>
> This is still happening. I moved off zookeeper from the server to its own
> dedicated VMs.
> Kakfa starts with 4G of heap and gets nowhere near that much consumed when
> it crashed.
> i bumped up the zookeeper timeout settings but that has not solved it.
>
> I also disconnected all the producers and consumers. This point something
> between kafka and zookeeper right?
>
> Again logs are no help as to why kafka decided to shut itself down
> https://gist.github.com/allenmchan/f9331e54bb4fd77cc5bc0b031a7a6206
>
>
>
>
> On Thu, Jun 2, 2016 at 4:22 PM, Russ Lavoie  wrote:
>
> > What about in dmesg?  I have run into this issue and it was the OOM
> > killer.  I also ran into a heap issue using too much of the direct memory
> > (JVM).  Reducing the fetcher threads helped with that problem.
> > On Jun 2, 2016 12:19 PM, "allen chan" 
> > wrote:
> >
> > > Hi Tom,
> > >
> > > That is one of the first things that i checked. Active memory never
> goes
> > > above 50% of overall available. File cache uses the rest of the memory
> > but
> > > i do not think that causes OOM killer.
> > > Either way there is no entries in /var/log/messages (centos) to show
> OOM
> > is
> > > happening.
> > >
> > > Thanks
> > >
> > > On Thu, Jun 2, 2016 at 5:36 AM, Tom Crayford 
> > wrote:
> > >
> > > > That looks like somebody is killing the process. I'd suspect either
> the
> > > > linux OOM killer or something else automatically killing the JVM for
> > some
> > > > reason.
> > > >
> > > > For the OOM killer, assuming you're on ubuntu, it's pretty easy to
> find
> > > in
> > > > /var/log/syslog (depending on your setup). I don't know about other
> > > > operating systems.
> > > >
> > > > On Thu, Jun 2, 2016 at 5:54 AM, allen chan <
> > allen.michael.c...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > I have an issue where my brokers would randomly shut itself down.
> > > > > I turned on debug in log4j.properties but still do not see a reason
> > why
> > > > the
> > > > > shutdown is happening.
> > > > >
> > > > > Anyone seen this behavior before?
> > > > >
> > > > > version 0.10.0
> > > > > log4j.properties
> > > > > log4j.rootLogger=DEBUG, kafkaAppender
> > > > > * I tried TRACE level but i do not see any additional log messages
> > > > >
> > > > > snippet of log around shutdown
> > > > > [2016-06-01 15:11:51,374] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:11:53,376] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:11:55,377] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:11:57,380] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:11:59,383] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:01,386] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:03,389] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker
> 2]:
> > > > > Removed 0 expired offsets in 0 milliseconds.
> > > > > (kafka.coordinator.GroupMetadataManager)
> > > > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker
> 2]:
> > > > > Removed 0 expired offsets in 0 milliseconds.
> > > > > (kafka.coordinator.GroupMetadataManager)
> > > > > [2016-06-01 15:12:05,390] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:07,393] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:09,396] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:11,399] DEBUG Got ping response for sessionid:
> > > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > > > (kafka.server.KafkaServer)
> > > > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > > > (kafka.server.KafkaServer)
> > > > > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting
> controlled
> > > > > shutdown (kafka.server.KafkaServer)
> > > > > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting
> controlled
> > > > > shutdown (kafka.server.KafkaServer)
> > > > > [2016-06-01 15:12:1

Re: broker randomly shuts down

2016-06-30 Thread allen chan
Anyone else have ideas?

This is still happening. I moved off zookeeper from the server to its own
dedicated VMs.
Kakfa starts with 4G of heap and gets nowhere near that much consumed when
it crashed.
i bumped up the zookeeper timeout settings but that has not solved it.

I also disconnected all the producers and consumers. This point something
between kafka and zookeeper right?

Again logs are no help as to why kafka decided to shut itself down
https://gist.github.com/allenmchan/f9331e54bb4fd77cc5bc0b031a7a6206




On Thu, Jun 2, 2016 at 4:22 PM, Russ Lavoie  wrote:

> What about in dmesg?  I have run into this issue and it was the OOM
> killer.  I also ran into a heap issue using too much of the direct memory
> (JVM).  Reducing the fetcher threads helped with that problem.
> On Jun 2, 2016 12:19 PM, "allen chan" 
> wrote:
>
> > Hi Tom,
> >
> > That is one of the first things that i checked. Active memory never goes
> > above 50% of overall available. File cache uses the rest of the memory
> but
> > i do not think that causes OOM killer.
> > Either way there is no entries in /var/log/messages (centos) to show OOM
> is
> > happening.
> >
> > Thanks
> >
> > On Thu, Jun 2, 2016 at 5:36 AM, Tom Crayford 
> wrote:
> >
> > > That looks like somebody is killing the process. I'd suspect either the
> > > linux OOM killer or something else automatically killing the JVM for
> some
> > > reason.
> > >
> > > For the OOM killer, assuming you're on ubuntu, it's pretty easy to find
> > in
> > > /var/log/syslog (depending on your setup). I don't know about other
> > > operating systems.
> > >
> > > On Thu, Jun 2, 2016 at 5:54 AM, allen chan <
> allen.michael.c...@gmail.com
> > >
> > > wrote:
> > >
> > > > I have an issue where my brokers would randomly shut itself down.
> > > > I turned on debug in log4j.properties but still do not see a reason
> why
> > > the
> > > > shutdown is happening.
> > > >
> > > > Anyone seen this behavior before?
> > > >
> > > > version 0.10.0
> > > > log4j.properties
> > > > log4j.rootLogger=DEBUG, kafkaAppender
> > > > * I tried TRACE level but i do not see any additional log messages
> > > >
> > > > snippet of log around shutdown
> > > > [2016-06-01 15:11:51,374] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:11:53,376] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:11:55,377] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:11:57,380] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:11:59,383] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:01,386] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:03,389] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> > > > Removed 0 expired offsets in 0 milliseconds.
> > > > (kafka.coordinator.GroupMetadataManager)
> > > > [2016-06-01 15:12:04,121] INFO [Group Metadata Manager on Broker 2]:
> > > > Removed 0 expired offsets in 0 milliseconds.
> > > > (kafka.coordinator.GroupMetadataManager)
> > > > [2016-06-01 15:12:05,390] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:07,393] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:09,396] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:11,399] DEBUG Got ping response for sessionid:
> > > > 0x2550a693b470001 after 1ms (org.apache.zookeeper.ClientCnxn)
> > > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > > (kafka.server.KafkaServer)
> > > > [2016-06-01 15:12:13,334] INFO [Kafka Server 2], shutting down
> > > > (kafka.server.KafkaServer)
> > > > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> > > > shutdown (kafka.server.KafkaServer)
> > > > [2016-06-01 15:12:13,336] INFO [Kafka Server 2], Starting controlled
> > > > shutdown (kafka.server.KafkaServer)
> > > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> > > connections-closed:
> > > > (org.apache.kafka.common.metrics.Metrics)
> > > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> > > connections-created:
> > > > (org.apache.kafka.common.metrics.Metrics)
> > > > [2016-06-01 15:12:13,338] DEBUG Added sensor with name
> > > bytes-sent-received:
> > > > (org.apache.kafka.common.metrics.Metrics)
> > > > [

Non blocking Kafka producer

2016-06-30 Thread Dan Bahir (BLOOMBERG/ 120 PARK)
Hi,

I have an application that needs to be low latency writing to Kafka. 

With the 0.81 producer I set queue.buffering.max.messages to the number of 
messages I would like to producer to store in memory and 
queue.enqueue.timeout.ms to 0 to have the producer throw an exception if the 
server was not keeping up.

On 0.9.0.1 I set buffer.memory that provides a similar result as setting 
queue.buffering.max.messages. 

But if a set max.block.ms to 0 I get an exception when meta data is retrieved. 

Is there a way to replicate the same behavior with the 0.9.0.1 producer?

Thanks!




console-consumer show offset

2016-06-30 Thread Fumo, Vincent
is there a way to show the console consumer offset value with the messages like 
we do the key? I tried --property print.offset=true but it didn't work

Re: Streams RocksDB State Store Disk Usage

2016-06-30 Thread Avi Flax
On Jun 29, 2016, at 22:44, Guozhang Wang  wrote:
> 
> One way to mentally quantify your state store usage is to consider the
> total key space in your reduceByKey() operator, and multiply by the average
> key-value pair size. Then you need to consider the RocksDB write / space
> amplification factor as well.

That makes sense, thank you!

> Currently Kafka Streams hard-write some RocksDB config values such as block
> size to achieve good write performance with the cost of write
> amplification, but we are now working on exposing those configs to the
> users so that they can override themselves:
> 
> https://issues.apache.org/jira/browse/KAFKA-3740

That looks excellent for the next release ;)

In the meantime, do you know anything specific about the RocksDB behavior with 
the LOG and LOG.old.{timestamp} files? (They don’t seem to me to be directly 
related to the storage space required by the actual state itself, unless I’m 
misunderstanding the word “log” — it is a bit overloaded in this community.) Is 
there something I can do in code to affect this? Or some way to 
understand/predict the growth patterns of these files, whether or not RocksDB 
has some kind of built-in cleanup feature or whether I need to set up a cron 
job on my own?

Thanks!
Avi

Re: How many connections per consumer/producer

2016-06-30 Thread Ben Stopford
Hi Dhiaraj

That shouldn’t be the case. As I understand it both the producer and consumer 
hold a single connection to each broker they need to communicate with. Multiple 
produce requests can be sent through a single connection in the producer (the 
number being configurable with max.in.flight.requests.per.connection and 
non-blocking io is used). Consumers (new consumer) send a single request to 
each broker. 

Hope that helps

B
> On 30 Jun 2016, at 11:03, dhiraj prajapati  wrote:
> 
> Hi,
> I am using new Kafka Consumer and Producer APIs (version 0.9.0.1)
> I see that my consumer as well as producer has multiple connections
> established with kafka brokers. Why is this so?
> Does the consumer and producer APIs use connection pooling? If yes, where
> do I configure the pool size?
> 
> Regards,
> Dhiraj



Re: streaming-enabled SQL in Kafka Streams?

2016-06-30 Thread Matthias J. Sax
Hi Alex,

we do have SQL layer on the long term roadmap (also considering Calcite).

Thanks!

-Matthias

On 06/30/2016 09:41 AM, Alex Glikson wrote:
> Did folks consider adding support in Kafka Streams for Apache Calcite [1], 
> for streaming-enabled SQL (potentially on top of existing DSL)? Sounds 
> like such a support could be useful to open Kafka Streams capabilities to 
> an even broader audience.
> 
> Thanks,
> Alex
> 
> 
> [1] https://calcite.apache.org/docs/stream.html
> 



signature.asc
Description: OpenPGP digital signature


Re: what is use of __consumer_offsets

2016-06-30 Thread Tom Crayford
No. This is used for tracking consumer offsets. Kafka manages cleaning it
up itself.

On Thu, Jun 30, 2016 at 1:52 PM, Snehalata Nagaje <
snehalata.nag...@harbingergroup.com> wrote:

>
> But does it create folder for every message we put in kafka, for every
> offset?
>
> And do we need to clean those folders? is there any configuration?
>
> - Original Message -
> From: "Tom Crayford" 
> To: "Users" 
> Sent: Thursday, June 30, 2016 6:11:03 PM
> Subject: Re: what is use of __consumer_offsets
>
> Hi there,
>
> Kafka uses this topic internally for consumer offset commits.
>
> Thanks
>
> Tom Crayford
> Heroku Kafka
>
> On Thu, Jun 30, 2016 at 1:36 PM, Snehalata Nagaje <
> snehalata.nag...@harbingergroup.com> wrote:
>
> >
> > Hi All,
> >
> >
> > I am using kafka 9 version with publish subscribe pattern, one consumer
> is
> > listening to particular topic
> >
> > What is use __consumer_offsets, folders created in log files?
> >
> > Does it have any impact on offset commiting?
> >
> >
> >
> > Thanks,
> > Snehalata
> >
>


Re: what is use of __consumer_offsets

2016-06-30 Thread Snehalata Nagaje

But does it create folder for every message we put in kafka, for every offset?

And do we need to clean those folders? is there any configuration?

- Original Message -
From: "Tom Crayford" 
To: "Users" 
Sent: Thursday, June 30, 2016 6:11:03 PM
Subject: Re: what is use of __consumer_offsets

Hi there,

Kafka uses this topic internally for consumer offset commits.

Thanks

Tom Crayford
Heroku Kafka

On Thu, Jun 30, 2016 at 1:36 PM, Snehalata Nagaje <
snehalata.nag...@harbingergroup.com> wrote:

>
> Hi All,
>
>
> I am using kafka 9 version with publish subscribe pattern, one consumer is
> listening to particular topic
>
> What is use __consumer_offsets, folders created in log files?
>
> Does it have any impact on offset commiting?
>
>
>
> Thanks,
> Snehalata
>


Re: what is use of __consumer_offsets

2016-06-30 Thread Tom Crayford
Hi there,

Kafka uses this topic internally for consumer offset commits.

Thanks

Tom Crayford
Heroku Kafka

On Thu, Jun 30, 2016 at 1:36 PM, Snehalata Nagaje <
snehalata.nag...@harbingergroup.com> wrote:

>
> Hi All,
>
>
> I am using kafka 9 version with publish subscribe pattern, one consumer is
> listening to particular topic
>
> What is use __consumer_offsets, folders created in log files?
>
> Does it have any impact on offset commiting?
>
>
>
> Thanks,
> Snehalata
>


what is use of __consumer_offsets

2016-06-30 Thread Snehalata Nagaje

Hi All, 


I am using kafka 9 version with publish subscribe pattern, one consumer is 
listening to particular topic 

What is use __consumer_offsets, folders created in log files? 

Does it have any impact on offset commiting? 



Thanks, 
Snehalata 


Re: Log retention just for offset topic

2016-06-30 Thread Sathyakumar Seshachalam
Thanks Tom. I think thats good enough for my needs

On Thu, Jun 30, 2016 at 4:20 PM, Tom Crayford  wrote:

> The default cleanup policy is delete, which is the regular time based
> retention.
>
> On Thursday, 30 June 2016, Sathyakumar Seshachalam <
> sathyakumar_seshacha...@trimble.com> wrote:
>
> > Or may be am wrong, and Log cleaner only picks up topics with a
> > cleanup.policy.
> > From the documentation it is not very obvious what the behaviour is.
> >
> > On Thu, Jun 30, 2016 at 10:33 AM, Sathyakumar Seshachalam <
> > sathyakumar_seshacha...@trimble.com > wrote:
> >
> > > Hi,
> > >
> > > Thanks for the response.
> > >
> > > I still like to know what happens for topics which have not defined a
> > > cleanup.policy.
> > > I assume the default value is compact. And hence all topic's logs will
> be
> > > compacted which I want to avoid.
> > >
> > > Am running 0.9.0, So will have to manually set log.cleaner.enable=true
> > >
> > > Regards,
> > > Sathya
> > >
> > > On Thu, Jun 30, 2016 at 10:20 AM, Manikumar Reddy <
> > > manikumar.re...@gmail.com > wrote:
> > >
> > >> Hi,
> > >>
> > >> Kafka internally creates the offsets topic (__consumer_offsets) with
> > >> compact mode on.
> > >> From 0.9.0.1 onwards log.cleaner.enable=true by default.  This means
> > >> topics
> > >> with a
> > >> cleanup.policy=compact will now be compacted by default,
> > >>
> > >> You can tweak the offset topic configuration by using below  props
> > >> offsets.topic.compression.codec
> > >> offsets.topic.num.partitions
> > >> offsets.topic.replication.factor
> > >> offsets.topic.segment.bytes
> > >> offsets.retention.minutes
> > >> offsets.retention.check.interval.ms
> > >>
> > >>
> > >> Thanks
> > >> Manikumar
> > >>
> > >> On Thu, Jun 30, 2016 at 9:49 AM, Sathyakumar Seshachalam <
> > >> sathyakumar_seshacha...@trimble.com > wrote:
> > >>
> > >> > Am little confused about how log cleaner works. My use case is that
> I
> > >> want
> > >> > to compact just selected topics (or in my case just the internal
> topic
> > >> > __consumers_offsets and want to leave other topics as is).
> > >> >
> > >> > Whats the right settings/configuration for this to happen.
> > >> >
> > >> > As I understand log cleaner enable/disable is a global setting. And
> my
> > >> > understanding is that they will clean all logs (compact logs based
> on
> > >> > cleanup policy), and so all topics' clean up policy will be
> considered
> > >> and
> > >> > hence compacted - compact being the default policy. Is this correct
> ?
> > >> >
> > >> > I have set all topic's retention duration to be a really
> exorbitantly
> > >> high
> > >> > value. Does it mean __consumer_offsets wont be compacted at all ? If
> > so,
> > >> > how to set retention time just for offset topic it being an internal
> > >> topic.
> > >> >
> > >> > Regards,
> > >> > Sathya
> > >> >
> > >>
> > >
> > >
> >
>


Re: Log retention just for offset topic

2016-06-30 Thread Tom Crayford
The default cleanup policy is delete, which is the regular time based
retention.

On Thursday, 30 June 2016, Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> Or may be am wrong, and Log cleaner only picks up topics with a
> cleanup.policy.
> From the documentation it is not very obvious what the behaviour is.
>
> On Thu, Jun 30, 2016 at 10:33 AM, Sathyakumar Seshachalam <
> sathyakumar_seshacha...@trimble.com > wrote:
>
> > Hi,
> >
> > Thanks for the response.
> >
> > I still like to know what happens for topics which have not defined a
> > cleanup.policy.
> > I assume the default value is compact. And hence all topic's logs will be
> > compacted which I want to avoid.
> >
> > Am running 0.9.0, So will have to manually set log.cleaner.enable=true
> >
> > Regards,
> > Sathya
> >
> > On Thu, Jun 30, 2016 at 10:20 AM, Manikumar Reddy <
> > manikumar.re...@gmail.com > wrote:
> >
> >> Hi,
> >>
> >> Kafka internally creates the offsets topic (__consumer_offsets) with
> >> compact mode on.
> >> From 0.9.0.1 onwards log.cleaner.enable=true by default.  This means
> >> topics
> >> with a
> >> cleanup.policy=compact will now be compacted by default,
> >>
> >> You can tweak the offset topic configuration by using below  props
> >> offsets.topic.compression.codec
> >> offsets.topic.num.partitions
> >> offsets.topic.replication.factor
> >> offsets.topic.segment.bytes
> >> offsets.retention.minutes
> >> offsets.retention.check.interval.ms
> >>
> >>
> >> Thanks
> >> Manikumar
> >>
> >> On Thu, Jun 30, 2016 at 9:49 AM, Sathyakumar Seshachalam <
> >> sathyakumar_seshacha...@trimble.com > wrote:
> >>
> >> > Am little confused about how log cleaner works. My use case is that I
> >> want
> >> > to compact just selected topics (or in my case just the internal topic
> >> > __consumers_offsets and want to leave other topics as is).
> >> >
> >> > Whats the right settings/configuration for this to happen.
> >> >
> >> > As I understand log cleaner enable/disable is a global setting. And my
> >> > understanding is that they will clean all logs (compact logs based on
> >> > cleanup policy), and so all topics' clean up policy will be considered
> >> and
> >> > hence compacted - compact being the default policy. Is this correct ?
> >> >
> >> > I have set all topic's retention duration to be a really exorbitantly
> >> high
> >> > value. Does it mean __consumer_offsets wont be compacted at all ? If
> so,
> >> > how to set retention time just for offset topic it being an internal
> >> topic.
> >> >
> >> > Regards,
> >> > Sathya
> >> >
> >>
> >
> >
>


How many connections per consumer/producer

2016-06-30 Thread dhiraj prajapati
Hi,
I am using new Kafka Consumer and Producer APIs (version 0.9.0.1)
I see that my consumer as well as producer has multiple connections
established with kafka brokers. Why is this so?
Does the consumer and producer APIs use connection pooling? If yes, where
do I configure the pool size?

Regards,
Dhiraj


Re: streaming-enabled SQL in Kafka Streams?

2016-06-30 Thread Damian Guy
Hi Alex,

Yes SQL support is something we'd like to add in the future. I'm not sure
when at this stage.

Thanks,
Damian

On Thu, 30 Jun 2016 at 08:41 Alex Glikson  wrote:

> Did folks consider adding support in Kafka Streams for Apache Calcite [1],
> for streaming-enabled SQL (potentially on top of existing DSL)? Sounds
> like such a support could be useful to open Kafka Streams capabilities to
> an even broader audience.
>
> Thanks,
> Alex
>
>
> [1] https://calcite.apache.org/docs/stream.html
>
>


Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

2016-06-30 Thread Clive Cox
Hi Eno,
I've looked at KIP-67. It looks good but its not clear what calls I would make 
to do what I presently need: Get access to each windowed store at some time 
soon after window end time. I can then use the methods specified to iterate 
over keys and values. Can you point me to the relevant method/technique for 
this?

Thanks,
Clive
 

On Tuesday, 28 June 2016, 12:47, Eno Thereska  
wrote:
 

 Hi Clive,

As promised, here is the link to the KIP that just went out today. Feedback 
welcome:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
 


Thanks
Eno

> On 27 Jun 2016, at 20:56, Eno Thereska  wrote:
> 
> Hi Clive,
> 
> We are working on exposing the state store behind a KTable as part of 
> allowing for queries to the structures currently hidden behind the language 
> (DSL). The KIP should be out today or tomorrow for you to have a look. You 
> can probably do what you need using the low-level processor API but then 
> you'd lose the benefits of the DSL and would have to maintain your own 
> structures.
> 
> Thanks,
> Eno
> 
>> On 26 Jun 2016, at 18:42, Clive Cox  wrote:
>> 
>> Following on from this thread, if I want to iterate over a KTable at the end 
>> of its hopping/tumbling Time Window how can I do this at present myself? Is 
>> there a way to access these structures?
>> If this is not possible it would seem I need to duplicate and manage 
>> something similar to a list of windowed KTables myself which is not really 
>> ideal.
>> Thanks for any help,
>> Clive
>> 
>> 
>>  On Monday, 13 June 2016, 16:03, Eno Thereska  wrote:
>> 
>> 
>> Hi Clive,
>> 
>> For now this optimisation is not present. We're working on it as part of 
>> KIP-63. One manual work-around might be to use a simple Key-value store to 
>> deduplicate the final output before sending to the backend. It could have a 
>> simple policy like "output all values at 1 second intervals" or "output 
>> after 10 records have been received".
>> 
>> Eno
>> 
>> 
>>> On 13 Jun 2016, at 13:36, Clive Cox  wrote:
>>> 
>>> 
>>> Thanks Eno for your comments and references.
>>> Perhaps, I can explain what I want to achieve and maybe you can suggest the 
>>> correct topology?
>>> I want process a stream of events and do aggregation and send to an 
>>> analytics backend (Influxdb), so that rather than sending 1000 points/sec 
>>> to the analytics backend, I send a much lower value. I'm only interested in 
>>> using the processing time of the event so in that respect there are no 
>>> "late arriving" events.I was hoping I could use a Tumbling window which 
>>> when its end-time had been passed I can send the consolidated aggregation 
>>> for that window and then throw the Window away. 
>>> 
>>> It sounds like from the references you give that this is not possible at 
>>> present in Kafka Streams?
>>> 
>>> Thanks,
>>> Clive 
>>> 
>>>    On Monday, 13 June 2016, 11:32, Eno Thereska  
>>>wrote:
>>> 
>>> 
>>> Hi Clive,
>>> 
>>> The behaviour you are seeing is indeed correct (though not necessarily 
>>> optimal in terms of performance as described in this JIRA: 
>>> https://issues.apache.org/jira/browse/KAFKA-3101 
>>> )
>>> 
>>> The key observation is that windows never close/complete. There could 
>>> always be late arriving events that appear long after a window's end 
>>> interval and those need to be accounted for properly. In Kafka Streams that 
>>> means that such late arriving events continue to update the value of the 
>>> window. As described in the above JIRA, some optimisations could still be 
>>> possible (e.g., batch requests as described in KIP-63 
>>> ),
>>>  however they are not implemented yet.
>>> 
>>> So your code needs to handle each update.
>>> 
>>> Thanks
>>> Eno
>>> 
>>> 
>>> 
 On 13 Jun 2016, at 11:13, Clive Cox  wrote:
 
 Hi,
  I would like to process a stream with a tumbling window of 5secs, create 
aggregated stats for keys and push the final aggregates at the end of each 
window period to a analytics backend. I have tried doing something like:
  stream
        .map
        .reduceByKey(...
          , TimeWindows.of("mywindow", 5000L),...)
        .foreach        {            send stats
          }
 But I get every update to the ktable in the foreach.
 How do I just get the final values once the TumblingWindow is complete so 
 I can iterate over them and send to some external system?
 Thanks,
  Clive
 PS Using kafka_2.10-0.10.0.0
 
>>> 
>>> 
>> 
>> 
> 


  

What happens after connections.max.idle.ms | Kafka Producer

2016-06-30 Thread dhiraj prajapati
Hi,
>From the document for producer configs:
connections.max.idle.ms is the time after which idle connections will be
closed.

I wish to know what will happen if my connections are idle for long, and
after that if the producer produces message?
I dont see any exception. How does the producer client handle it?

kafka version: 0.9.0.1

Regards,
Dhiraj


streaming-enabled SQL in Kafka Streams?

2016-06-30 Thread Alex Glikson
Did folks consider adding support in Kafka Streams for Apache Calcite [1], 
for streaming-enabled SQL (potentially on top of existing DSL)? Sounds 
like such a support could be useful to open Kafka Streams capabilities to 
an even broader audience.

Thanks,
Alex


[1] https://calcite.apache.org/docs/stream.html



Re: Consumer Group, relabancing and partition uniqueness

2016-06-30 Thread Spico Florin
Hi!
  The partitioner (load-)balance the partitions among consumers like this:
1. if your number of consumer = number of partitions then you'll get 1
consumer with one partition
2. if no of consumer < number of partitions then partitions they are not
allocated randomly to the consumers but following an algorithm  (
https://cwiki.apache.org/confluence/display/KAFKA/FAQ)
Can I predict the results of the consumer rebalance?

During the rebalance process, each consumer will execute the same
deterministic algorithm to range partition a sorted list of
topic-partitions over a sorted list of consumer instances. This makes the
whole rebalancing process deterministic. For example, if you only have one
partition for a specific topic and going to have two consumers consuming
this topic, only one consumer will get the data from the partition of the
topic; and even if the consumer named "Consumer1" is registered after the
other consumer named "Consumer2", it will replace "Consumer2" gaining the
ownership of the partition in the rebalance.

Range partitioning works on a per-topic basis. For each topic, we lay out
the available partitions in numeric order and the consumer threads in
lexicographic order. We then divide the number of partitions by the total
number of consumer streams (threads) to determine the number of partitions
to allocate to each consumer. If it does not evenly divide, then the first
few consumers will have one extra partition. For example, suppose there are
two consumers C1 and C2 with two streams each, and there are five available
partitions (p0, p1, p2, p3, p4). So each consumer thread will get at least
one partition and the first consumer thread will get one extra partition.
So the assignment will be: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0,
p4 -> C2-1


3. if no of consumers > number of partitions then will have 1consumer per
1partition and the remaining consumers will sit back and relax :) (meaning
do nothing)

In your logs is the case that you trigger multiple rebalance either:
 - your consumers die on the road
- you add new consumers (by starting new consumers)

Be careful that the rebalance takes at the group level. Meaning if you have
a different topic T2 for that you have a group of consumers with the same
group name as the consumers for the topic T1, then rebalance will be
triggered each time a consumer for topic T1,T2 is added or removed (dead).

4. Regarding the consumer to consume for a specific partition this
functionality is provided by the Kafka API:

KafkaConsumer consumer = new KafkaConsumer<>(props);
String topic = "my_topic";

// consumer.subscribe(Arrays.asList(topic));
TopicPartition topicPartition = new TopicPartition(topic, 0);
  consumer.assign(Arrays.asList(topicPartition));

Be aware that assigning direct the partition, the consumer will not be
affected by rebalance. If one of the consumer dies then the associated
partition to it will no be consumed anymore (will not be assigned to a
diffrent consumer)/.

I hope that it helps.
 Florin






On Thu, Jun 30, 2016 at 1:25 AM, Milind Vaidya  wrote:

> Florin,
>
> Thanks, I got your point.
>
> The documentation as well as diagram showing the mechanism of consumer
> group indicates that,the partitions are shared disjointly by consumers in a
> group.
> You also stated above "Each of your consumer will receive message for its
> allocated partition for that they subscribed."
>
> e.g. P1..P5 are partitions and we have C1C5 consumers belonging to
> same group. So is it correct to assume that C2 will consume for P4(say) and
> not from any other partition. Similarly Ck will consume from Pm where 1 >=
> k, m <= 5. If no rebalancing happens, as in none of the consumers dies, how
> long will this combination sustain ? or random rebalance may happen after a
> while leading to C2 consuming from P3 as against P4 from which it was
> originally consuming.
>
> I have my logs for the consumer, which indicate that partitions associated
> with a consumer change periodically.
> Is there any mechanism by which I can make sure a consumer consumes from a
> particular partition for sufficient amount of time which is configurable
> provided none of the consumers goes down triggering rebalance.
>
>
>
>
> On Wed, Jun 29, 2016 at 3:02 PM, Spico Florin 
> wrote:
>
> > Hi!
> >   By default kafka uses internally a round robin partitioner that will
> send
> > the messages to the right partition based on the message key. Each of
> your
> > consumer will receive message for its allocated partition for that they
> > subscribed.
> >   In case of rebalance, if you add more consumers than the partitions
> then
> > some of the consumers will not get any data. If one of the consumers
> dies,
> > then the remained consumers will get messages from the partitions
> depending
> > on their client id. Kafka internally uses the client id (lexicogarphic
> > order) to allocate the partitions.
> >
> > I hope that this give you an overview of what happens and someho

RE: Running kafka connector application

2016-06-30 Thread Andrew Stevenson
The twitter connector pom builds a fat jar with all dependencies. You need to 
add this to the classpath before you start Connect. This is what the Confluent 
scripts are doing.

Regards

Andrew

From: Ewen Cheslack-Postava
Sent: ‎14/‎06/‎2016 07:35
To: users@kafka.apache.org
Subject: Re: Running kafka connector application

Kanagha,

I'm not sure about that particular connector, but normally the build script
would provide support for collecting the necessary dependencies. Then all
you need to do is add something like /path/to/connector-and-deps/* to your
classpath and it shouldn't be affected by versions in the pom.xml, you'd
just rebuild to pull in the new dependencies. For example, Confluent's
connectors are all setup so if you build with mvn package, it'll include
directories with all the dependency jars included.

-Ewen

On Sun, Jun 12, 2016 at 2:36 PM, Kanagha  wrote:

> Hi,
>
>
> I'm running the TwitterProducer task as per
> https://github.com/Eneco/kafka-connect-twitter
>
> connect-standalone /connect-source-standalone.properties
> /twitter-source.properties
>
> I see that I have to set the CLASSPATH to include all the dependent jars
> that the target connector jar is dependent on. This method wouldn't be
> robust if the version changes in pom.xml
>
> Is there an easier approach to run the standlone connect application?
>
>
> Thanks
> Kanagha
>
>
>
>
>
> Kanagha
>



--
Thanks,
Ewen