Re: Kafka partition no migrating to another broker.

2017-03-02 Thread Stevo Slavić
Hello Shri,

That behavior is by current Apache Kafka design. At topic creation time,
for every topic partition, replication factor is converted to a replica set
(set of ids of brokers which should replicate the partition), and those per
partition replica sets is the metadata the gets stored in ZooKeepeer,
replication factor does not get stored. Replication factor of a topic
partition can be calculated from replica set size. There is no active
component in Apache Kafka that would be actively moving/distributing
partition replicas around available brokers as brokers are added/removed
to/from the cluster or when they crash.

In scenario that you simulated, when a broker crashes, one should be
monitoring availability of brokers in the cluster, and metrics like ISR
shrinks and under replicated partitions, and then recover the broker that
crashed by either fixing or bringing new one in its place (keeping same
broker id as old one). If old one is fixed it will catch up from other
replicas, or if new one is put in its place, since it has same id as old
one, it will replicate partition(s) completely from other brokers which are
leaders for those partitions. In both cases there is no need to change
replica set of any partition where that broker was member of replica set.
Very similar to crash like scenario is handling of another typical failure
scenario - when there is a network partition so broker is not accessible by
cluster controller broker or it cannot reach out to ZooKeeper ensemble. As
soon as network issue is resolved, broker should start catching up.

If there was component doing automatic reassignment it would have to make
some tough decisions with hard to predict consequences. E.g. if broker
became inaccessible to a component and such component would decide that it
should change replica set of all affected partitions that the broker was
replica for, brokers that took over and became new replicas they would
start replicating a fresh partitions they just became replicas for, and
that could cause huge load on the cluster members, disk and network. And
that decision could be wrong, extra load it caused unnecessary, if broker
that was thought of being crashed for long, became accessible or came back
up (automatically or was brought back up manually) shortly after
reassignment happened.

If you do planned shrinking of the Kafka cluster, or you're adding new
brokers too it, there's a command line tool you can use to change replica
assignment for existing partitions to balance the partitions across the
cluster, while newly created topics will automatically take it into account
which brokers are available in the cluster. Balancing the partitions is
typically not same thing as balancing the load across the cluster since
load is typically not even across different topics and it can be the case
too for load on partitions of a given topic.

Replication and other Apache Kafka design and implementation topics are
well covered in the documentation (see
http://kafka.apache.org/documentation/#replication )
Some extra auto balancing features are available in Confluent Enterprise
platform (e.g see
http://docs.confluent.io/3.2.0/cp-docker-images/docs/tutorials/automatic-data-balancing.html
)

Kind regards,
Stevo Slavic.


On Thu, Mar 2, 2017 at 10:20 PM, Shrikant Patel  wrote:

> I have 5 broker kafka cluster. Replication factor = 3, Number of partition
> = 12, Min Insync repica (ISR) = 3
>
> First output is when all server are up and running. Second output is when
> I bring down server id = 4. Another server from ISR take server 4's place
> as leader of partition, so that's good. I was expecting another broker will
> join ISR in place for server 4. Why doesn't that happen? Since min ISR is
> 3, I cannot publish to certain partitions of the topic.
>
> When server 4 comes back up, it starts to work fine.
>
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
> Topic:xxx   PartitionCount:12   ReplicationFactor:3 Configs:
> Topic: xxx  Partition: 0Leader: 5   Replicas: 5,2,3 Isr:
> 3,2,5
> Topic: xxx  Partition: 1Leader: 1   Replicas: 1,3,4 Isr:
> 3,4,1
> Topic: xxx  Partition: 2Leader: 2   Replicas: 2,4,5 Isr:
> 2,5,4
> Topic: xxx  Partition: 3Leader: 3   Replicas: 3,5,1 Isr:
> 5,3,1
> Topic: xxx  Partition: 4Leader: 4   Replicas: 4,1,2 Isr:
> 2,4,1
> Topic: xxx  Partition: 5Leader: 5   Replicas: 5,3,4 Isr:
> 3,5,4
> Topic: xxx  Partition: 6Leader: 1   Replicas: 1,4,5 Isr:
> 5,4,1
> Topic: xxx  Partition: 7Leader: 2   Replicas: 2,5,1 Isr:
> 5,2,1
> Topic: xxx  Partition: 8Leader: 3   Replicas: 3,1,2 Isr:
> 3,2,1
> Topic: xxx  Partition: 9Leader: 4   Replicas: 4,2,3 Isr:
> 3,2,4
> Topic: xxx  Partition: 10   Leader: 5   Replicas: 5,4,1 Isr:
> 5,4,1
> Topic: xxx  Partition: 11   Leader: 1   Replicas: 1,5,2 Isr:
> 5,2,1

'Batch Expired' exception when publishing to a kafka topic using one producer per thread.

2017-03-02 Thread Sreeram
Hi,

 I get 'Batch Expired' exception when I publish to a kafka topic using one
producer per thread.  However using the same producer for each thread works
perfectly fine and I do not get the exception.

 I do not  understand what was causing this 'Batch Expired' exception with
one producer per thread.

Any help will be much appreciated.

Below are config params used for Producer :
acks=all
retries=10
batch.size=0 (got same error also with 16384)
compression.type=snappy
key serializer = String serializer
value serializer = String serializer

All other params are left at the defaults.

Below is 'Batch Expired' Exception that came with one producer per thread
:---

Mar  1 13:04:32 d-devrta-rc-4c614a024c1c07c374e2fb40149eebb6-phs3u
drta_app: ERROR http-nio2-8180-exec-1 KafkaServiceImpl.publishData -
Exception occurred in publishData
{}#012java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Batch Expired#012#011at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.
valueOrError(FutureRecordMetadata.java:56)#012#011at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
FutureRecordMetadata.java:43)#012#011at org.apache.kafka.clients.
producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)#
012#011at

Thanks,
Sreeram


understanding consumer rebalance trigger(s)

2017-03-02 Thread Jon Yeargers
Im wondering what the parameters are to instantiate a consumer rebalance. I
have a topic that turns roughly 50K / minute across 6 partitions. Each is
serviced by a separate dockerized consumer.

Roughly every 8-12 min this goes into a rebalance that may take up to a
minute. When it returns it often puts some or all partitions on to a single
consumer (leaving others idle). This may persist for a minute while it
tries another arrangement. Eventually after 2-3 tries it will evenly
distribute the partitions.. for a few minutes until it does another
misguided attempt. As a result we have lag increasing from 0 to ~450K and
back to 0 on a cycle.

The data rate is assumed to be roughly consistent through these cycles.

Resultant graph of lag is a sawtooth shape.

Using 0.10.0.1
3 brokers


Also - is there some way to set / control consumer 'assignment'? Or to
'suggest' a setting?


Kafka metrics to Prometheus

2017-03-02 Thread Yifan Ying
Hi Kafka users,

I am trying to expose Kafka client metrics to Prometheus via
*MetricsReporter*. And it looks like Kafka clients don't expose the
*Measurable* objects so that I can only do *KafkaMetric.value()* and use it
as Gauge in Prometheus even if the metric could be a Percentile in Kafka
clients. This would be a problem if these gauge metrics need to be
aggregated in prometheus. So ideally, I hope we can provide customized
*Measurable* objects for Kafka clients to use, and those Measurable objects
will maintain a set of corresponding Prometheus metric objects. For
example, for a customized Counter object, whenever it updates the value, it
also updates the value of a corresponding Prometheus object. In this way,
we will have a full copy of sample data from Kafka metrics in Prometheus
metric object.

I would like to know if anyone has this issue and how this should be
handled. Thanks!

-- 
Yifan


how to produce/consume kafka through ssh tunnel

2017-03-02 Thread wei
We have kafka setup on staging environment, when debug the consumer we want
to directly listen to the kafka on staging environment. I set tunnel but it
seems i can't produce or consume from my local machine. I can create topic
though. I have no problem to produce/consumer on other machine in staging
environment.

kafka version: kafka_2.12-0.10.2.0
kafka setup: port 9090/9091 on host 192.168.10.110 on staging environment
zookeeper: port 2181 on 192.168.10.58
tunnelling: -L 9090:192.168.10.110:9090 -L 9091:192.168.10.110:9091 -L 2181:
192.168.10.58:2181

$ bin/kafka-console-producer.sh --broker-list localhost:9091 --topic test
test msg
[2017-03-02 14:43:21,414] ERROR Error when sending message to topic test
with key: null, value: 8 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
test-0: 1503 ms has passed since batch creation plus linger time


Kafka partition no migrating to another broker.

2017-03-02 Thread Shrikant Patel
I have 5 broker kafka cluster. Replication factor = 3, Number of partition = 
12, Min Insync repica (ISR) = 3

First output is when all server are up and running. Second output is when I 
bring down server id = 4. Another server from ISR take server 4's place as 
leader of partition, so that's good. I was expecting another broker will join 
ISR in place for server 4. Why doesn't that happen? Since min ISR is 3, I 
cannot publish to certain partitions of the topic.

When server 4 comes back up, it starts to work fine.

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
Topic:xxx   PartitionCount:12   ReplicationFactor:3 Configs:
Topic: xxx  Partition: 0Leader: 5   Replicas: 5,2,3 Isr: 3,2,5
Topic: xxx  Partition: 1Leader: 1   Replicas: 1,3,4 Isr: 3,4,1
Topic: xxx  Partition: 2Leader: 2   Replicas: 2,4,5 Isr: 2,5,4
Topic: xxx  Partition: 3Leader: 3   Replicas: 3,5,1 Isr: 5,3,1
Topic: xxx  Partition: 4Leader: 4   Replicas: 4,1,2 Isr: 2,4,1
Topic: xxx  Partition: 5Leader: 5   Replicas: 5,3,4 Isr: 3,5,4
Topic: xxx  Partition: 6Leader: 1   Replicas: 1,4,5 Isr: 5,4,1
Topic: xxx  Partition: 7Leader: 2   Replicas: 2,5,1 Isr: 5,2,1
Topic: xxx  Partition: 8Leader: 3   Replicas: 3,1,2 Isr: 3,2,1
Topic: xxx  Partition: 9Leader: 4   Replicas: 4,2,3 Isr: 3,2,4
Topic: xxx  Partition: 10   Leader: 5   Replicas: 5,4,1 Isr: 5,4,1
Topic: xxx  Partition: 11   Leader: 1   Replicas: 1,5,2 Isr: 5,2,1


./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
Topic:xxx   PartitionCount:12   ReplicationFactor:3 Configs:
Topic: xxx  Partition: 0Leader: 5   Replicas: 5,2,3 Isr: 3,2,5
Topic: xxx  Partition: 1Leader: 1   Replicas: 1,3,4 Isr: 3,1
Topic: xxx  Partition: 2Leader: 2   Replicas: 2,4,5 Isr: 2,5
Topic: xxx  Partition: 3Leader: 3   Replicas: 3,5,1 Isr: 5,3,1
Topic: xxx  Partition: 4Leader: 1   Replicas: 4,1,2 Isr: 2,1
Topic: xxx  Partition: 5Leader: 5   Replicas: 5,3,4 Isr: 3,5
Topic: xxx  Partition: 6Leader: 1   Replicas: 1,4,5 Isr: 5,1
Topic: xxx  Partition: 7Leader: 2   Replicas: 2,5,1 Isr: 5,2,1
Topic: xxx  Partition: 8Leader: 3   Replicas: 3,1,2 Isr: 3,2,1
Topic: xxx  Partition: 9Leader: 2   Replicas: 4,2,3 Isr: 2,3
Topic: xxx  Partition: 10   Leader: 5   Replicas: 5,4,1 Isr: 5,1
Topic: xxx  Partition: 11   Leader: 1   Replicas: 1,5,2 Isr: 5,2,1

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic xxx
Topic:xxx   PartitionCount:12   ReplicationFactor:3 Configs:
Topic: xxx  Partition: 0Leader: 5   Replicas: 5,2,3 Isr: 3,2,5
Topic: xxx  Partition: 1Leader: 1   Replicas: 1,3,4 Isr: 3,1,4
Topic: xxx  Partition: 2Leader: 2   Replicas: 2,4,5 Isr: 2,5,4
Topic: xxx  Partition: 3Leader: 3   Replicas: 3,5,1 Isr: 5,3,1
Topic: xxx  Partition: 4Leader: 4   Replicas: 4,1,2 Isr: 2,1,4
Topic: xxx  Partition: 5Leader: 5   Replicas: 5,3,4 Isr: 3,5,4
Topic: xxx  Partition: 6Leader: 1   Replicas: 1,4,5 Isr: 5,1,4
Topic: xxx  Partition: 7Leader: 2   Replicas: 2,5,1 Isr: 5,2,1
Topic: xxx  Partition: 8Leader: 3   Replicas: 3,1,2 Isr: 3,2,1
Topic: xxx  Partition: 9Leader: 4   Replicas: 4,2,3 Isr: 2,3,4
Topic: xxx  Partition: 10   Leader: 5   Replicas: 5,4,1 Isr: 5,1,4
Topic: xxx  Partition: 11   Leader: 1   Replicas: 1,5,2 Isr: 5,2,1



Thanks,
Shri

This e-mail and its contents (to include attachments) are the property of 
National Health Systems, Inc., its subsidiaries and affiliates, including but 
not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, 
and may contain confidential and proprietary or privileged information. If you 
are not the intended recipient of this e-mail, you are hereby notified that any 
unauthorized disclosure, copying, or distribution of this e-mail or of its 
attachments, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, please immediately notify the 
sender by telephone at 800-433-5719 or return e-mail and permanently delete the 
original e-mail.


Kafka mirror maker issue

2017-03-02 Thread yang yong
Hi I am doing some experiments on kafka mirror maker. I have two Kafka cluster, 
one is kafka_2.11-0.9.0.1 with 3 nodes as source cluster, another is 
kafka_2.10-0.9.0.1 with one node as target cluster.


First I mirror the message for topic TEST_TOPIC from source cluster to target 
cluster with consumer group KAFKA_MIRROR_GROUP_1. After all the messages are 
mirrored from source to target, then I stop the mirror maker. Then I change the 
consumer group to KAFKA_MIRROR_GOURP_2 in consumer configuration properties 
file and start the mirror maker again. I can see all the messages in the source 
cluster have been consumed again by the new consumer group, 
KAFKA_MIRROR_GROUP_2, but the number of messages in the target cluster are NOT 
double. If I start a new client to produce new messages to TEST_TOPIC topic in 
the source cluster, these new messages are mirrored to the target cluster by 
the new consumer group, KAFKA_MIRROR_GROUP_2.


I cannot understand why the messages for TEST_TOPIC in the target cluster are 
not double after second run with different consumer group? Could anyone advise 
me the reason? Many thanks in advance.


Regards,


Donald


What is request.timeout in the consumer used for?

2017-03-02 Thread Jeff Widman
In the consumer, what will trigger the request.timeout?

Is it just if broker doesn't respond within that period of time?

I'm guessing in a healthy cluster, the primary culprit for triggering this
is if one of the steps within the consumer group rebalancing taking a long
time of inter-broker communication before the broker responds to the
consumer... is that true?

Any other common culprits?

Context:
We're seeing this triggered within a dev environment when set at 40
seconds. Our consumers are 3rd-party clients which don't support background
heartbeating. The cluster and consumers otherwise appear healthy, so I'm
guessing it's just slow VMs. But wanted to confirm no unexpected side
effects before I raise this value.


Re: Need some help in identifying some important metrics to monitor for streams

2017-03-02 Thread Sachin Mittal
Hi,
I had checked the monitoring docs, but could not figure out which metrics
are important ones.

Also mainly I am looking at the average time spent between 2 successive
poll requests.
Can I say that average time between 2 poll requests is sum of

commit + poll + process + punctuate (latency-avg).


Also I checked the benchmark tests results but could not find any
information on rocksdb metrics for fetch and put operations.
Is there any benchmark for these or based on my values in previous mail can
something be commented on its performance.


Lastly can we get some help on names like new-part-advice-d1094e71-0f59-
45e8-98f4-477f9444aa91-StreamThread-1 and have more standard name of thread
like new-advice-1-StreamThread-1(as in version 10.1.1) so we can log these
metrics as part of out cron jobs.

Thanks
Sachin



On Thu, Mar 2, 2017 at 9:31 PM, Eno Thereska  wrote:

> Hi Sachin,
>
> The new streams metrics are now documented at https://kafka.apache.org/
> documentation/#kafka_streams_monitoring  documentation/#kafka_streams_monitoring>. Note that not all of them are
> turned on by default.
>
> We have several benchmarks that run nightly to monitor streams
> performance. They all stem from the SimpleBenchmark.java benchmark. In
> addition, their results are published nightly here
> http://testing.confluent.io , (e.g., under
> the trunk results). E.g., looking at today's results:
> http://confluent-kafka-system-test-results.s3-us-west-2.
> amazonaws.com/2017-03-02--001.1488449554--apache--trunk--
> ef92bb4/report.html  west-2.amazonaws.com/2017-03-02--001.1488449554--apache--
> trunk--ef92bb4/report.html>
> (if you search for "benchmarks.streams") you'll see results from a series
> of benchmarks, ranging from simply consuming, to simple topologies with a
> source and sink, to joins and count aggregate. These run on AWS nightly,
> but you can also run manually on your setup.
>
> In addition, programmatically the code can check the KafkaStreams.state()
> and register listeners for when the state changes. For example, the state
> can change from "running" to "rebalancing".
>
> It is likely we'll need more metrics moving forward and would be great to
> get feedback from the community.
>
>
> Thanks
> Eno
>
>
>
>
> > On 2 Mar 2017, at 11:54, Sachin Mittal  wrote:
> >
> > Hello All,
> > I had few questions regarding monitoring of kafka streams application and
> > what are some important metrics we should collect in our case.
> >
> > Just a brief overview, we have a single thread application (0.10.1.1)
> > reading from single partition topic and it is working all fine.
> > Then we have same application (using 0.10.2.0) multi threaded with 4
> > threads per machine and 3 machines cluster setup reading for same but
> > partitioned topic (12 partitions).
> > Thus we have each thread processing single partition same case as earlier
> > one.
> >
> > The new setup also works fine in steady state, but under load somehow it
> > triggers frequent re-balance and then we run into all sort of issues like
> > stream thread dying due to CommitFailedException or entering into
> deadlock
> > state.
> > After a while we restart all the instances then it works fine for a while
> > and again we get the same problem and it goes on.
> >
> > 1. So just to monitor, like when first thread fails what would be some
> > important metrics we should be collecting to get some sense of whats
> going
> > on?
> >
> > 2. Is there any metric that tells time elapsed between successive poll
> > requests, so we can monitor that?
> >
> > Also I did monitor rocksdb put and fetch times for these 2 instances and
> > here is the output I get:
> > 0.10.1.1
> > $>get -s  -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1
> > key-table-put-avg-latency-ms
> > #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> > id=new-advice-1-StreamThread-1:
> > 206431.7497615029
> > $>get -s  -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1
> > key-table-fetch-avg-latency-ms
> > #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> > id=new-advice-1-StreamThread-1:
> > 2595394.2746129474
> > $>get -s  -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1
> > key-table-put-qps
> > #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> > id=new-advice-1-StreamThread-1:
> > 232.86299499317252
> > $>get -s  -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1
> > key-table-fetch-qps
> > #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> > id=new-advice-1-StreamThread-1:
> > 373.61071016166284
> >
> > Same values for 0.10.2.0 I get
> > $>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> > id=new-part-advice-d1094e71-0f59-45e8-98f4-4

Re: Getting thread deadlock in streams 10.2.0 if a partition is re-assigned to a different thread in same instance

2017-03-02 Thread Sachin Mittal
Hi,
It makes sense looks like task 0_4 took more than max poll timeout time.
However I have difficulty processing following lines:

DEBUG 2017-03-01 18:17:42,465 [StreamThread-1]:
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] creating new task 0_4
...
DEBUG 2017-03-01 18:17:42,590 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.Fetcher - Handling
ListOffsetResponse response for new-part-advice-key-table-changelog-4.
Fetched offset 8591351, timestamp -1
DEBUG 2017-03-01 18:17:42,590 [StreamThread-1]:
org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of
partition new-part-advice-key-table-changelog-4
DEBUG 2017-03-01 18:17:42,590 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
partition new-part-advice-key-table-changelog-4 to earliest offset.
DEBUG 2017-03-01 18:17:42,591 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.Fetcher - Handling
ListOffsetResponse response for new-part-advice-key-table-changelog-4.
Fetched offset 8408559, timestamp -1
DEBUG 2017-03-01 18:17:48,357 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.Fetcher - Ignoring fetched
records for new-part-advice-key-table-changelog-4 at offset 8408559 since
the current position is 8411374

...
DEBUG 2017-03-01 18:24:18,585 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.Fetcher - Ignoring fetched
records for new-part-advice-key-table-changelog-4 at offset 8590981 since
the current position is 8591351
DEBUG 2017-03-01 18:24:18,585 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for
partitions [new-part-advice-key-table-changelog-4] to broker
192.168.73.199:9092 (id: 5 rack: null)


So when the new task is created it tries to replay the entire change log 4
partition from offset 8408559 to offset 8591351, and the whole process
takes about 7 minutes.

So why does it take so long to read some 18 records? Is this some
rocksdb issue because it tries to recreate the entire state store.

Also why did it replay from the beginning? Shouldn't it have started from
somewhere in between, place where the previous thread to which this
partition was assigned, committed its offsets?

Thanks
Sachin



On Thu, Mar 2, 2017 at 9:05 PM, Damian Guy  wrote:

> Hi Sachin,
>
> This is because another thread hasn't released the lock. It is generally
> caused because it is taking a long time to restore or process the incoming
> records. It might be helpful to take a Thread dump so we can see what each
> thread is doing.
>
> Thanks,
> Damian
>
> On Thu, 2 Mar 2017 at 12:14 Sachin Mittal  wrote:
>
> > Hi,
> > We are getting some deadlock state after re-balance and what we found
> that
> > it happens after re-balance the partition is assigned to a different
> thread
> > in same instance.
> >
> > Looks like previous thread is not releasing the rocks db lock.
> >
> > Here are the complete logs spanned across multiple files.
> >
> >
> > https://drive.google.com/open?id=0B-KsMy4P1bGxLTVVVWlZOGdQZTUzYm9a
> V3lfOFdubTJ2X2JR
> >
> > I am just posting the gist here as how we understood this issue.
> >
> >
> > *= TestKafkaAdvice.2017-03-01.89.log *
> >
> > 53352 INFO 2017-03-01 18:11:56,980 [StreamThread-3]:
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> > Successfully joined group new-part-advice with *generation 96*
> >
> > 53353 INFO 2017-03-01 18:11:56,980 [StreamThread-4]:
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> > Successfully joined group new-part-advice with *generation 96*
> >
> > 53354 INFO 2017-03-01 18:11:56,980 [StreamThread-1]:
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> > Successfully joined group new-part-advice with *generation 96*
> >
> >
> > 53361 INFO 2017-03-01 *18:11:57,084* [StreamThread-1]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> > stream-thread *[StreamThread-1]
> > New partitions [[advice-stream-9, advice-strea**m-6]]* assigned at the
> end
> > of consumer rebalance.
> >
> > 53362 INFO 2017-03-01 18:11:57,084 [StreamThread-3]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> > stream-thread *[StreamThread-3]
> > New partitions [[advice-stream-7]]* assigned at the end of consumer
> > rebalance.
> >
> > 53363 DEBUG 2017-03-01 18:11:57,084 [StreamThread-3]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [StreamThread-3] closing suspended non-assigned task
> >
> > 53364 INFO 2017-03-01 18:11:57,084 [StreamThread-4]:
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -
> Setting
> > newly assigned partitions [advice-stream-8, advice-stream-2] fo  r
> > group new-part-advice
> >
> > 53365 INFO 2017-03-01 18:11:57,084 [StreamThread-4]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> > stream-thread *[StreamThread-4]
> > New partitions [[advice-stream-8

RE: [EXTERNAL] - Call to consumer.poll(1000) hangs

2017-03-02 Thread Isabelle Giguère
Hi Dhirendra;

You should probably subscribe to the topic only once, in the constructor 
ConsumerLoop, not at every run.

Isabelle 

-Original Message-
From: Dhirendra Suman [mailto:dhirendra.su...@globallogic.com.INVALID] 
Sent: 2 mars 2017 05:13
To: users@kafka.apache.org
Subject: [EXTERNAL] - Call to consumer.poll(1000) hangs

Hi,

https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_42551704_call-2Dto-2Dconsumerrecordsstring-2Dstring-2Drecords-2Dconsumer-2Dpoll1000-2Dhangs-2Dand&d=DwIBaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Q-Opfw1cmcw80JA4CTcy2RLzNYbaH9HFkA73Lysw0Lo&m=n-cDDtJkR0MpF7MQjM5QrU-L0Wq90w8-sxSRgIM_D6s&s=N4A7xpgiA9cjX7TR82AUtlLULiWzyscNeEqTZQTg20A&e=
 

Thanks ,
Dhirendra


Re: Recommended number of partitions on each broker

2017-03-02 Thread Jeff Widman
We normally run over 1,000 partitions per broker, and I know of a major
company with 30+ kafka clusters that averages 1,100 partitions per broker
across all clusters.

So 300 shouldn't be an issue as long as the throughput per partition isn't
too high. Given that disk and cpu are so low, I'd guess this isn't the
issue.

On Wed, Mar 1, 2017 at 10:54 PM, Vijay Patil 
wrote:

> 300 partitions is at lower side, surely that won't be the root cause.
> How about usage of network bandwidth for your nodes?
> Are they reachable to zookeeper?
> Are you executing some partition-rebalancing jobs (or re-assigning
> partitions) in parallel?
>
> On 2 March 2017 at 10:57, Jun MA  wrote:
>
> > Hi,
> >
> > I’m curious what’s the recommended number of partitions running on each
> > individual broker? We have a 3 nodes clusters running 0.9.0.1, each one
> has
> > 24 cores, 1.1T ssd, 48G ram, 10G NIC. There’s about 300 partitions
> running
> > on each broker and the resource usage is pretty low (5% cpu, 50% ram, 2%
> > disk). But we see a lot random ISR expand/shrink happens to the cluster.
> > Our networking is pretty stable, so I’m wondering if that’s because more
> > partitions than kafka could handle? Is there any kafka parameters that we
> > could tuning?
> >
> > Thanks,
> > Jun
>


Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-03-02 Thread Jaikiran Pai
Thank you for pointing me to that JIRA. It indeed is the same issue we 
discussed in this thread. I'll keep a watch on that JIRA for the code to 
be merged.


-Jaikiran

On Thursday 02 March 2017 07:11 PM, Rajini Sivaram wrote:

This issue is being addressed in KAFKA-4631. See
https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
PR https://github.com/apache/kafka/pull/2622 for details.

Regards,

Rajini

On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai 
wrote:


For future reference - I asked this question on dev mailing list and based
on the discussion there was able to come up with a workaround to get this
working. Details here https://www.mail-archive.com/d
e...@kafka.apache.org/msg67613.html

-Jaikiran


On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:


We are on Kafka 0.10.0.1 (server and client) and use Java
consumer/producer APIs. We have an application where we create Kafka topics
dynamically (using the AdminUtils Java API) and then start
producing/consuming on those topics. The issue we frequently run into is
this:

1. Application process creates a topic "foo-bar" via
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be
enrolled in consumer group for this topic because of this (notice the last
line in the log):

2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer
- Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer
- Subscribed to topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Received group coordinator response
ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
mpletionHandler@50aad50f, request=RequestSend(header={ap
i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
body={group_id=my-app-group}), createdTimeMs=1487667523378,
sendTimeMs=1487667523529), responseBody={error_code=0,coo
rdinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Discovered coordinator
localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
mer.internals.ConsumerCoordinator - Revoking previously assigned
partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Sending JoinGroup
({group_id=my-app-group,session_timeout=3,member_id=,
protocol_type=consumer,group_protocols=[{protocol_name=
range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
- Added sensor with name node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
- Added sensor with name node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG org.apache.kafka.common.metrics.Metrics
- Added sensor with name node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractCoordinator - Received successful join group
response for group my-app-group: {error_code=0,generation_id=1,
group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.ConsumerCoordinator - Performing assignment for group
my-app-group using strategy range with subscriptions
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
n(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
foo-bar since no metadata is available*


4. A few seconds later, a separate process, produces (via Java producer
API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on
the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance which
then successfully assigns partition(s) of this foo-bar topic to consumer
created in step#2 and the consumer start co

Re: Need some help in identifying some important metrics to monitor for streams

2017-03-02 Thread Eno Thereska
Hi Sachin,

The new streams metrics are now documented at 
https://kafka.apache.org/documentation/#kafka_streams_monitoring 
. Note that 
not all of them are turned on by default. 

We have several benchmarks that run nightly to monitor streams performance. 
They all stem from the SimpleBenchmark.java benchmark. In addition, their 
results are published nightly here http://testing.confluent.io 
, (e.g., under the trunk results). E.g., looking 
at today's results:
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2017-03-02--001.1488449554--apache--trunk--ef92bb4/report.html
 

(if you search for "benchmarks.streams") you'll see results from a series of 
benchmarks, ranging from simply consuming, to simple topologies with a source 
and sink, to joins and count aggregate. These run on AWS nightly, but you can 
also run manually on your setup.

In addition, programmatically the code can check the KafkaStreams.state() and 
register listeners for when the state changes. For example, the state can 
change from "running" to "rebalancing".

It is likely we'll need more metrics moving forward and would be great to get 
feedback from the community.


Thanks
Eno




> On 2 Mar 2017, at 11:54, Sachin Mittal  wrote:
> 
> Hello All,
> I had few questions regarding monitoring of kafka streams application and
> what are some important metrics we should collect in our case.
> 
> Just a brief overview, we have a single thread application (0.10.1.1)
> reading from single partition topic and it is working all fine.
> Then we have same application (using 0.10.2.0) multi threaded with 4
> threads per machine and 3 machines cluster setup reading for same but
> partitioned topic (12 partitions).
> Thus we have each thread processing single partition same case as earlier
> one.
> 
> The new setup also works fine in steady state, but under load somehow it
> triggers frequent re-balance and then we run into all sort of issues like
> stream thread dying due to CommitFailedException or entering into deadlock
> state.
> After a while we restart all the instances then it works fine for a while
> and again we get the same problem and it goes on.
> 
> 1. So just to monitor, like when first thread fails what would be some
> important metrics we should be collecting to get some sense of whats going
> on?
> 
> 2. Is there any metric that tells time elapsed between successive poll
> requests, so we can monitor that?
> 
> Also I did monitor rocksdb put and fetch times for these 2 instances and
> here is the output I get:
> 0.10.1.1
> $>get -s  -b 
> kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
> key-table-put-avg-latency-ms
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1:
> 206431.7497615029
> $>get -s  -b 
> kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
> key-table-fetch-avg-latency-ms
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1:
> 2595394.2746129474
> $>get -s  -b 
> kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
> key-table-put-qps
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1:
> 232.86299499317252
> $>get -s  -b 
> kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
> key-table-fetch-qps
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-advice-1-StreamThread-1:
> 373.61071016166284
> 
> Same values for 0.10.2.0 I get
> $>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
> key-table-put-latency-avg
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
> 1199859.5535022356
> $>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
> key-table-fetch-latency-avg
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
> 3679340.80748852
> $>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
> key-table-put-rate
> #mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
> 56.134778706069184
> $>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
> key-table-fe

SASL/PLAIN text

2017-03-02 Thread Rumney, Owen (HARVEY NASH)
Hi

I've got a 3 broker kerberised Kafka 0.10 install running in Cloudera and I'm 
trying to authenticate with SASL/PLAIN

I'm passing kafka_server_jaas.conf into the JVM on each of the brokers.


KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username=admin
  password=password1
  user_admin=password1
  user_remote=password1;
};

My server.properties (or kafka.properties as Cloudera renames it) is set as 
below;


listeners=SASL_SSL://10.10.3.47:9093 # ip set for each broker
advertised.listeners=SASL_SSL://10.10.3.47:9093 # ip set for each broker
sasl.enabled.mechanisms=GSSAPI,PLAIN
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=GSSAPI

When Kafka starts up, the inter-broker communication is all fine, but when I 
try to connect using the console producer I get a Timeout failed to update 
metadata


bin/kafka-consolproducer --broker-list 10.10.3.161:9093 --topic test1 
--producer.config client.properties.plain

client.properties.plain is set to


security.protocol=SASL_SSL
sasl.mechanism=PLAIN

finally, the client side jaas.conf


KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="remote"
password="password1";
};

As far as I can tell I've followed all instructions correctly, can anyone see 
anything wrong?

Thanks,
Owen


Call to consumer.poll(1000) hangs

2017-03-02 Thread Dhirendra Suman
Hi,

http://stackoverflow.com/questions/42551704/call-to-consumerrecordsstring-string-records-consumer-poll1000-hangs-and

Thanks ,
Dhirendra


Using Kafka-connect transofrmer

2017-03-02 Thread Mina Aslani
Hi,

I am new to Kafka/Kafka-connect. I would like to use Kafka-Connect
transformer to get specific fields from my data @ a kafka topic.

I was not able to find information/examples/documents about how to use
Kafka-Connect transformer.

I really appreciate if I can get some info on that!

Best regards,
Mina


Re: Getting thread deadlock in streams 10.2.0 if a partition is re-assigned to a different thread in same instance

2017-03-02 Thread Damian Guy
Hi Sachin,

This is because another thread hasn't released the lock. It is generally
caused because it is taking a long time to restore or process the incoming
records. It might be helpful to take a Thread dump so we can see what each
thread is doing.

Thanks,
Damian

On Thu, 2 Mar 2017 at 12:14 Sachin Mittal  wrote:

> Hi,
> We are getting some deadlock state after re-balance and what we found that
> it happens after re-balance the partition is assigned to a different thread
> in same instance.
>
> Looks like previous thread is not releasing the rocks db lock.
>
> Here are the complete logs spanned across multiple files.
>
>
> https://drive.google.com/open?id=0B-KsMy4P1bGxLTVVVWlZOGdQZTUzYm9aV3lfOFdubTJ2X2JR
>
> I am just posting the gist here as how we understood this issue.
>
>
> *= TestKafkaAdvice.2017-03-01.89.log *
>
> 53352 INFO 2017-03-01 18:11:56,980 [StreamThread-3]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> Successfully joined group new-part-advice with *generation 96*
>
> 53353 INFO 2017-03-01 18:11:56,980 [StreamThread-4]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> Successfully joined group new-part-advice with *generation 96*
>
> 53354 INFO 2017-03-01 18:11:56,980 [StreamThread-1]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> Successfully joined group new-part-advice with *generation 96*
>
>
> 53361 INFO 2017-03-01 *18:11:57,084* [StreamThread-1]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-1]
> New partitions [[advice-stream-9, advice-strea**m-6]]* assigned at the end
> of consumer rebalance.
>
> 53362 INFO 2017-03-01 18:11:57,084 [StreamThread-3]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-3]
> New partitions [[advice-stream-7]]* assigned at the end of consumer
> rebalance.
>
> 53363 DEBUG 2017-03-01 18:11:57,084 [StreamThread-3]:
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-3] closing suspended non-assigned task
>
> 53364 INFO 2017-03-01 18:11:57,084 [StreamThread-4]:
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting
> newly assigned partitions [advice-stream-8, advice-stream-2] fo  r
> group new-part-advice
>
> 53365 INFO 2017-03-01 18:11:57,084 [StreamThread-4]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-4]
> New partitions [[advice-stream-8, advice-strea**m-2]]* assigned at the end
> of consumer rebalance.
>
>
>
> 54253 INFO 2017-03-01 18:13:19,146 *[StreamThread-4]*:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-4]
> Creating active task 0_2 with assigned partiti**ons [[advice-stream-2]]*
>
>
> 56789 INFO 2017-03-01 18:17:42,311 [StreamThread-1]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> Successfully joined group new-part-advice with *generation 97*
>
> 56790 INFO 2017-03-01 18:17:42,311 [StreamThread-3]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> Successfully joined group new-part-advice with *generation 97*
>
>
> 56796 INFO 2017-03-01 *18:17:42,312* [StreamThread-3]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-3]
> New partitions [[advice-stream-9, advice-strea**m-6]]* assigned at the end
> of consumer rebalance.
>
> 56805 INFO 2017-03-01 18:17:42,312 [StreamThread-1]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-1]
> New partitions [[advice-stream-8, advice-strea**m-4]]* assigned at the end
> of consumer rebalance.
>
>
>
> 56823 INFO 2017-03-01 18:17:42,465 *[StreamThread-1]*:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-1]
> Creating active task 0_4 with assigned partiti**ons [[advice-stream-4]]*
>
> 56831 DEBUG 2017-03-01 18:17:42,531 *[StreamThread-1]*:
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group
> new-part-advice *fetching committed offsets for partitions:
> [advi**ce-stream-8,
> advice-stream-4]*
>
>
>
> 57769 INFO 2017-03-01 *18:20:16,291* [StreamThread-4]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-4]
> Creating active task 0_8 with assigned partiti**ons [[advice-stream-8]]*
>
>
>
>
> 57935 INFO 2017-03-01 18:20:19,181 [StreamThread-4]:
> org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread *[StreamThread-4]
> Removing all active tasks [[0_2, 0_8]]*
>
>
>
> * TestKafkaAdvice.2017-03-01.90.log  *
>
>
>  742 INFO 2017-03-01 *18:22:42,697* [StreamThread-3]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
> Successfully joined group new-part-advice with *generation 98*
>
>  743 INFO 2017-03-01 18:22:42,697 [StreamThread-4]:
> org.apache.kafka.clients.consumer.internals.AbstractCoordina

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-03-02 Thread Rajini Sivaram
This issue is being addressed in KAFKA-4631. See
https://issues.apache.org/jira/browse/KAFKA-4631 and the discussion in the
PR https://github.com/apache/kafka/pull/2622 for details.

Regards,

Rajini

On Thu, Mar 2, 2017 at 4:35 AM, Jaikiran Pai 
wrote:

> For future reference - I asked this question on dev mailing list and based
> on the discussion there was able to come up with a workaround to get this
> working. Details here https://www.mail-archive.com/d
> e...@kafka.apache.org/msg67613.html
>
> -Jaikiran
>
>
> On Wednesday 22 February 2017 01:16 PM, Jaikiran Pai wrote:
>
>> We are on Kafka 0.10.0.1 (server and client) and use Java
>> consumer/producer APIs. We have an application where we create Kafka topics
>> dynamically (using the AdminUtils Java API) and then start
>> producing/consuming on those topics. The issue we frequently run into is
>> this:
>>
>> 1. Application process creates a topic "foo-bar" via
>> AdminUtils.createTopic. This is sucessfully completed.
>> 2. Same application process then creates a consumer (using new Java
>> consumer API) on that foo-bar topic as a next step.
>> 3. The consumer that gets created in step#2 however, doesn't seem to be
>> enrolled in consumer group for this topic because of this (notice the last
>> line in the log):
>>
>> 2017-02-21 00:58:43,359 [Thread-6] DEBUG 
>> org.apache.kafka.clients.consumer.KafkaConsumer
>> - Kafka consumer created
>> 2017-02-21 00:58:43,360 [Thread-6] DEBUG 
>> org.apache.kafka.clients.consumer.KafkaConsumer
>> - Subscribed to topic(s): foo-bar
>> 2017-02-21 00:58:43,543 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Received group coordinator response
>> ClientResponse(receivedTimeMs=1487667523542, disconnected=false,
>> request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clie
>> nts.consumer.internals.ConsumerNetworkClient$RequestFutureCo
>> mpletionHandler@50aad50f, request=RequestSend(header={ap
>> i_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>> body={group_id=my-app-group}), createdTimeMs=1487667523378,
>> sendTimeMs=1487667523529), responseBody={error_code=0,coo
>> rdinator={node_id=0,host=localhost,port=9092}})
>> 2017-02-21 00:58:43,543 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Discovered coordinator
>> localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator - Revoking previously assigned
>> partitions [] for group my-app-group
>> 2017-02-21 00:58:43,545 [Thread-6] INFO org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - (Re-)joining group my-app-group
>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Sending JoinGroup
>> ({group_id=my-app-group,session_timeout=3,member_id=,
>> protocol_type=consumer,group_protocols=[{protocol_name=
>> range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=59 cap=59]}]})
>> to coordinator localhost:9092 (id: 2147483647 <(214)%20748-3647> rack:
>> null)
>> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
>> org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.bytes-sent
>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
>> org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.bytes-received
>> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
>> org.apache.kafka.common.metrics.Metrics
>> - Added sensor with name node-2147483647.latency
>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractCoordinator - Received successful join group
>> response for group my-app-group: {error_code=0,generation_id=1,
>> group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe
>> -87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-
>> 87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-
>> 402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>> lim=59 cap=59]}]}
>> 2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator - Performing assignment for group
>> my-app-group using strategy range with subscriptions
>> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscriptio
>> n(topics=[foo-bar])}
>> *2017-02-21 00:58:43,552 [Thread-6] DEBUG org.apache.kafka.clients.consu
>> mer.internals.AbstractPartitionAssignor - Skipping assignment for topic
>> foo-bar since no metadata is available*
>>
>>
>> 4. A few seconds later, a separate process, produces (via Java producer
>> API) on the foo-bar topic, some messages.
>> 5. The consumer created in step#2 (although is waiting for messages) on
>> the foo-bar topic, _doesn't_ consume these messages.
>> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which
>> then successfully assigns partition(s) of this foo-bar topic to consumer
>> created in step#2 and 

'Batch Expired' exception when I publish to a kafka topic using one producer per thread.

2017-03-02 Thread Sreeram
Hi,

 I get 'Batch Expired' exception when I publish to a kafka topic using one
producer per thread.  However using the same producer for each thread works
perfectly fine and I do not get the exception.

 I do not  understand what was causing this 'Batch Expired' exception (with
one producer/thread). I am concerned if there is any bug lurking in my code
which manifests only when running with a one producer per thread.

Any help will be much appreciated.

Below are config params used for Producer :
acks=all
retries=10
batch.size=0 (got same error also with 16384)
compression.type=snappy
key serializer = String serializer
value serializer = String serializer

All other params are left at the defaults.

Below is 'Batch Expired' Exception that came with one producer per thread
:---

Mar  1 13:04:32 d-devrta-rc-4c614a024c1c07c374e2fb40149eebb6-phs3u
drta_app: ERROR http-nio2-8180-exec-1 KafkaServiceImpl.publishData -
Exception occurred in publishData
{}#012java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Batch Expired#012#011at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)#012#011at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)#012#011at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)#012#011at

Thanks,
Sreeram


Getting thread deadlock in streams 10.2.0 if a partition is re-assigned to a different thread in same instance

2017-03-02 Thread Sachin Mittal
Hi,
We are getting some deadlock state after re-balance and what we found that
it happens after re-balance the partition is assigned to a different thread
in same instance.

Looks like previous thread is not releasing the rocks db lock.

Here are the complete logs spanned across multiple files.

https://drive.google.com/open?id=0B-KsMy4P1bGxLTVVVWlZOGdQZTUzYm9aV3lfOFdubTJ2X2JR

I am just posting the gist here as how we understood this issue.


*= TestKafkaAdvice.2017-03-01.89.log *

53352 INFO 2017-03-01 18:11:56,980 [StreamThread-3]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 96*

53353 INFO 2017-03-01 18:11:56,980 [StreamThread-4]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 96*

53354 INFO 2017-03-01 18:11:56,980 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 96*


53361 INFO 2017-03-01 *18:11:57,084* [StreamThread-1]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-1]
New partitions [[advice-stream-9, advice-strea**m-6]]* assigned at the end
of consumer rebalance.

53362 INFO 2017-03-01 18:11:57,084 [StreamThread-3]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-3]
New partitions [[advice-stream-7]]* assigned at the end of consumer
rebalance.

53363 DEBUG 2017-03-01 18:11:57,084 [StreamThread-3]:
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-3] closing suspended non-assigned task

53364 INFO 2017-03-01 18:11:57,084 [StreamThread-4]:
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting
newly assigned partitions [advice-stream-8, advice-stream-2] fo  r
group new-part-advice

53365 INFO 2017-03-01 18:11:57,084 [StreamThread-4]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-4]
New partitions [[advice-stream-8, advice-strea**m-2]]* assigned at the end
of consumer rebalance.



54253 INFO 2017-03-01 18:13:19,146 *[StreamThread-4]*:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-4]
Creating active task 0_2 with assigned partiti**ons [[advice-stream-2]]*


56789 INFO 2017-03-01 18:17:42,311 [StreamThread-1]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 97*

56790 INFO 2017-03-01 18:17:42,311 [StreamThread-3]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 97*


56796 INFO 2017-03-01 *18:17:42,312* [StreamThread-3]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-3]
New partitions [[advice-stream-9, advice-strea**m-6]]* assigned at the end
of consumer rebalance.

56805 INFO 2017-03-01 18:17:42,312 [StreamThread-1]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-1]
New partitions [[advice-stream-8, advice-strea**m-4]]* assigned at the end
of consumer rebalance.



56823 INFO 2017-03-01 18:17:42,465 *[StreamThread-1]*:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-1]
Creating active task 0_4 with assigned partiti**ons [[advice-stream-4]]*

56831 DEBUG 2017-03-01 18:17:42,531 *[StreamThread-1]*:
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group
new-part-advice *fetching committed offsets for partitions: [advi**ce-stream-8,
advice-stream-4]*



57769 INFO 2017-03-01 *18:20:16,291* [StreamThread-4]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-4]
Creating active task 0_8 with assigned partiti**ons [[advice-stream-8]]*




57935 INFO 2017-03-01 18:20:19,181 [StreamThread-4]:
org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread *[StreamThread-4]
Removing all active tasks [[0_2, 0_8]]*



* TestKafkaAdvice.2017-03-01.90.log  *


 742 INFO 2017-03-01 *18:22:42,697* [StreamThread-3]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 98*

 743 INFO 2017-03-01 18:22:42,697 [StreamThread-4]:
org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
Successfully joined group new-part-advice with *generation 98*


746 INFO 2017-03-01 18:22:42,697 [StreamThread-3]: org.apache.kafka.streams.
processor.internals.StreamThread - stream-thread *[StreamThread-3] New
partitions [[advice-stream-9, advice-stream**-4]]* assigned at the end of
consumer rebalance.

748 INFO 2017-03-01 18:22:42,712 [StreamThread-4]: org.apache.kafka.streams.
processor.internals.StreamThread - stream-thread *[StreamThread-4] New
partitions [[advice-stream-8, advice-stream**-2]]* assigned at the 

Need some help in identifying some important metrics to monitor for streams

2017-03-02 Thread Sachin Mittal
Hello All,
I had few questions regarding monitoring of kafka streams application and
what are some important metrics we should collect in our case.

Just a brief overview, we have a single thread application (0.10.1.1)
reading from single partition topic and it is working all fine.
Then we have same application (using 0.10.2.0) multi threaded with 4
threads per machine and 3 machines cluster setup reading for same but
partitioned topic (12 partitions).
Thus we have each thread processing single partition same case as earlier
one.

The new setup also works fine in steady state, but under load somehow it
triggers frequent re-balance and then we run into all sort of issues like
stream thread dying due to CommitFailedException or entering into deadlock
state.
After a while we restart all the instances then it works fine for a while
and again we get the same problem and it goes on.

1. So just to monitor, like when first thread fails what would be some
important metrics we should be collecting to get some sense of whats going
on?

2. Is there any metric that tells time elapsed between successive poll
requests, so we can monitor that?

Also I did monitor rocksdb put and fetch times for these 2 instances and
here is the output I get:
0.10.1.1
$>get -s  -b 
kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
key-table-put-avg-latency-ms
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-advice-1-StreamThread-1:
206431.7497615029
$>get -s  -b 
kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
key-table-fetch-avg-latency-ms
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-advice-1-StreamThread-1:
2595394.2746129474
$>get -s  -b 
kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
key-table-put-qps
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-advice-1-StreamThread-1:
232.86299499317252
$>get -s  -b 
kafka.streams:type=stream-rocksdb-window-metrics,client-id=new-advice-1-StreamThread-1
key-table-fetch-qps
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-advice-1-StreamThread-1:
373.61071016166284

Same values for 0.10.2.0 I get
$>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
key-table-put-latency-avg
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
1199859.5535022356
$>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
key-table-fetch-latency-avg
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
3679340.80748852
$>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
key-table-put-rate
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
56.134778706069184
$>get -s -b kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1
key-table-fetch-rate
#mbean = kafka.streams:type=stream-rocksdb-window-metrics,client-
id=new-part-advice-d1094e71-0f59-45e8-98f4-477f9444aa91-StreamThread-1:
136.10721427931827

I notice that result in 10.2.0 is much worse than same for 10.1.1

I would like to know
1. Is there any benchmark on rocksdb as at what rate/latency it should be
doing put/fetch operations.

2. What could be the cause of inferior numbers in 10.2.0, is it because
this application is also running three other threads doing the same thing.

3. Also whats with the name new-part-advice-d1094e71-
0f59-45e8-98f4-477f9444aa91-StreamThread-1
I wanted to put this as a part of my cronjob, so why can't we have
simpler name like we have in 10.1.1, so it is easy to write the script.

Thanks
Sachin