Re: Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Alex Craig
For sink connectors, I believe you can scale up the tasks to match the
partitions on the topic.  But I don't believe this is the case for source
connectors; the number of partitions on the topic you're producing to has
nothing to do with the number of connector tasks.  It really depends on the
individual source connector and if the source data-type could benefit from
multiple tasks.  For example, the JDBC source connector (a very popular
connector) only supports 1 task - even if you're querying multiple tables.

Bottom line: you'll need to check the documentation for the connector in
question to see if it supports multiple tasks.

Alex

On Thu, May 30, 2024 at 7:51 AM Sébastien Rebecchi 
wrote:

> Hello
>
> Confirmed. Partition is the minimal granularity level, so having more
> consumers than the number of partitions of a topic for a same consumer
> group is useless, having P partitions means maximum parallelism is reached
> using P consumers.
>
> Regards,
>
> Sébastien.
>
> Le jeu. 30 mai 2024 à 14:43, Yeikel Santana  a écrit :
>
> > Hi everyone,
> >
> >
> > From my understanding, if a topic has  n partitions, we can create up to
> n
> > tasks for both the source and sink connectors to achieve the maximum
> > parallelism. Adding more tasks would not be beneficial, as they would
> > remain idle and be limited to the number of partitions of the topic
> >
> >
> > Could you please confirm if this understanding is correct?
> >
> >
> > If this understanding is incorrect could you please explain the
> > relationship if any?
> >
> >
> > Thank you!
> >
> >
> >
>


Re: Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Sébastien Rebecchi
Hello

Confirmed. Partition is the minimal granularity level, so having more
consumers than the number of partitions of a topic for a same consumer
group is useless, having P partitions means maximum parallelism is reached
using P consumers.

Regards,

Sébastien.

Le jeu. 30 mai 2024 à 14:43, Yeikel Santana  a écrit :

> Hi everyone,
>
>
> From my understanding, if a topic has  n partitions, we can create up to n
> tasks for both the source and sink connectors to achieve the maximum
> parallelism. Adding more tasks would not be beneficial, as they would
> remain idle and be limited to the number of partitions of the topic
>
>
> Could you please confirm if this understanding is correct?
>
>
> If this understanding is incorrect could you please explain the
> relationship if any?
>
>
> Thank you!
>
>
>


Re: [EXTERNAL] Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Tauzell, Dave
The docs say:  “Each task is assigned to a thread. Each task is capable of 
handling multiple Kafka partitions, but a single partition must be handled by 
only one task.”From what I understand additional tasks would sit idle.



From: Yeikel Santana 
Date: Thursday, May 30, 2024 at 7:43 AM
To: users@kafka.apache.org 
Subject: [EXTERNAL] Regarding Kafka connect task to partition relationship for 
both source and sink connectors
Hi everyone,


From my understanding, if a topic has  n partitions, we can create up to n 
tasks for both the source and sink connectors to achieve the maximum 
parallelism. Adding more tasks would not be beneficial, as they would remain 
idle and be limited to the number of partitions of the topic


Could you please confirm if this understanding is correct?


If this understanding is incorrect could you please explain the relationship if 
any?


Thank you!

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Regarding Kafka connect task to partition relationship for both source and sink connectors

2024-05-30 Thread Yeikel Santana
Hi everyone,


>From my understanding, if a topic has  n partitions, we can create up to n 
>tasks for both the source and sink connectors to achieve the maximum 
>parallelism. Adding more tasks would not be beneficial, as they would remain 
>idle and be limited to the number of partitions of the topic


Could you please confirm if this understanding is correct?


If this understanding is incorrect could you please explain the relationship if 
any?


Thank you!




Re: Question regarding Kafka compaction and tombstone

2023-02-16 Thread Nanda Naga
Nevermind, it seems the way i send tombstone has issues.

I followed this link for sending tombstones but it seems it has some issue

kcat/README.md at master · edenhill/kcat · 
GitHub<https://github.com/edenhill/kcat/blob/master/README.md>


$ echo "abc:" | kcat -b mybroker -t mytopic -Z -K: (This one takes " as message)

$ echo abc: | kcat -b mybroker -t mytopic -Z -K: (This one takes one empty 
space as input)

The right way to send tombstone is

$ echo abc:| kcat -b mybroker -t mytopic -Z -K:

Regards,
Nanda


From: Nanda Naga
Sent: Wednesday, February 15, 2023 9:31 PM
To: users@kafka.apache.org 
Subject: Question regarding Kafka compaction and tombstone

Hi,

I am using Kafka 3.2 (in windows) and for a topic i to send tombstone records. 
Everything was ok but i always see last value for the key (even i see null 
records present after delete.retention.ms period)

Example
Key1  value1
Key2  value2
Key1 - null record - tombstone record
and so on

I am expecting to see only Key2 in the above the case but is see
Key2 value2
Key1 - null

It is not in active segments as i keep sending lot of other key value pairs and 
kept my segment.bytes to 1000 only

following is the configuration

Topic: NandaFinalTest   PartitionCount: 1   ReplicationFactor: 1
Configs: 
cleanup.policy=compact,segment.bytes=1000,retention.ms=100,flush.messages=2,message.format.version=2.3-IV1,max.compaction.lag.ms=2,max.message.bytes=112,min.compaction.lag.ms=1,message.timestamp.type=LogAppendTime,min.cleanable.dirty.ratio=0.01,index.interval.bytes=4096,delete.retention.ms=0,message.timestamp.difference.max.ms=60480,segment.index.bytes=10485760

Attached my data log folder for my topic for reference.

Also, i see in all documentations/forums/discussions that the tombstone record 
should get deleted after delete.retention.ms time but I always end up seeing 
the last record with null

Am i missing something in the config and please help me in this regard. I need 
tombstone records to get deleted as well after delete.retention.ms time

Let me know if you need any more details to help here. Thanks


Regards,

Nanda




Question regarding Kafka compaction and tombstone

2023-02-15 Thread Nanda Naga
Hi,

I am using Kafka 3.2 (in windows) and for a topic i to send tombstone records. 
Everything was ok but i always see last value for the key (even i see null 
records present after delete.retention.ms period)

Example
Key1  value1
Key2  value2
Key1 - null record - tombstone record
and so on

I am expecting to see only Key2 in the above the case but is see
Key2 value2
Key1 - null

It is not in active segments as i keep sending lot of other key value pairs and 
kept my segment.bytes to 1000 only

following is the configuration

Topic: NandaFinalTest   PartitionCount: 1   ReplicationFactor: 1
Configs: 
cleanup.policy=compact,segment.bytes=1000,retention.ms=100,flush.messages=2,message.format.version=2.3-IV1,max.compaction.lag.ms=2,max.message.bytes=112,min.compaction.lag.ms=1,message.timestamp.type=LogAppendTime,min.cleanable.dirty.ratio=0.01,index.interval.bytes=4096,delete.retention.ms=0,message.timestamp.difference.max.ms=60480,segment.index.bytes=10485760

Attached my data log folder for my topic for reference.

Also, i see in all documentations/forums/discussions that the tombstone record 
should get deleted after delete.retention.ms time but I always end up seeing 
the last record with null

Am i missing something in the config and please help me in this regard. I need 
tombstone records to get deleted as well after delete.retention.ms time

Let me know if you need any more details to help here. Thanks

Regards,
Nanda

<>


Re: Regarding kafka 2.3.0

2022-08-25 Thread Luke Chen
1. Is kafka 2.3.0 going end of life ,If yes then what is the expected date?
-> Kafka supports last 3 releases.

REF:
https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy
?

2. Is kafka 3.1.0 backward compatible to 2.3.0?
-> Since 2.3 to 3.1 has one major release (through 3.0), some deprecated
features are removed. You can refer to this doc for upgrade guide:
https://kafka.apache.org/documentation/#upgrade_3_1_0 , and check for
release note for each release.

Thanks
Luke

On Thu, Aug 25, 2022 at 3:42 PM Fred Bai  wrote:

> +1
> Me too, We consider upgrading Kafka to 3.X from Kafka 2.X, but don't know
> the compatibility.
>
> thx
>
> Ankit Saran  于2022年8月23日周二 22:21写道:
>
> > Hi Team,
> > We are planning to upgrade kafka version from 2.3.0 to 3.1.0 , We have
> > below queries regarding the same
> >
> > 1. Is kafka 2.3.0 going end of life ,If yes then what is the expected
> date?
> > 2. Is kafka 3.1.0 backward compatible to 2.3.0?
> >
> > Please help us with the above queries, Thanks in advance.
> >
> > Regards,
> > Ankit Saran
> >
>


Re: Regarding kafka 2.3.0

2022-08-25 Thread Fred Bai
+1
Me too, We consider upgrading Kafka to 3.X from Kafka 2.X, but don't know
the compatibility.

thx

Ankit Saran  于2022年8月23日周二 22:21写道:

> Hi Team,
> We are planning to upgrade kafka version from 2.3.0 to 3.1.0 , We have
> below queries regarding the same
>
> 1. Is kafka 2.3.0 going end of life ,If yes then what is the expected date?
> 2. Is kafka 3.1.0 backward compatible to 2.3.0?
>
> Please help us with the above queries, Thanks in advance.
>
> Regards,
> Ankit Saran
>


Regarding kafka 2.3.0

2022-08-23 Thread Ankit Saran
Hi Team,
We are planning to upgrade kafka version from 2.3.0 to 3.1.0 , We have
below queries regarding the same

1. Is kafka 2.3.0 going end of life ,If yes then what is the expected date?
2. Is kafka 3.1.0 backward compatible to 2.3.0?

Please help us with the above queries, Thanks in advance.

Regards,
Ankit Saran


Re: Query regarding kafka controller shutdown

2022-06-25 Thread dhiraj prajapati
Thanks for your reply.
Replication factor is 2 and min.insync.replicas is default (1)
I will get back to you on the producer's ack settings.


Regards,
Dhiraj

On Sun, Jun 5, 2022 at 1:20 PM Liam Clarke-Hutchinson 
wrote:

> Off the top of my head, it looks like it lost network connectivity to some
> extent.
>
> Question - what settings were used for topics like efGamePlay? What is min
> insync replicas, replication factor, and what acks settings is the producer
> using?
>
> Cheers,
>
> Liam
>
> On Fri, 3 Jun 2022 at 22:55, dhiraj prajapati 
> wrote:
>
> > Hi all,
> > Recently we faced an issue with one of our production kafka clusters:
> >  - It is a 3 node cluster
> >  - kafka server version is 1.0
> >
> > *Issue*:
> > One of the brokers had some problem resulting in the following:
> > 1. The broker lost leadership of all of the topic-parttions
> > 2. However the kafka server process did *NOT* get stopped/killed.
> >
> > I have attached the exception traces from controller and server logs at
> > that time at the end of this email.
> >
> > *Questions:*
> > 1. What could be the reason behind this happening?
> > 2. How can I reproduce this exact scenario in our test environment?
> >
> > P.S. We did not see any GC logs, or general network blips at around that
> > time.
> >
> > *Exception trace in controller log:*
> >
> > [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2]
> Starting
> > (kafka.controller.RequestSendThread)
> > [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2]
> Starting
> > (kafka.controller.RequestSendThread)
> > [2022-05-21 09:07:48,354] ERROR [Controller id=2] Error while electing or
> > becoming controller on broker 2 (kafka.controller.KafkaController)
> > kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either
> > before or while waiting for connection
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:233)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:215)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> > at
> >
> >
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:214)
> > at
> >
> >
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1461)
> > at
> >
> >
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1437)
> > at
> > kafka.zk.KafkaZkClient.getPartitionReassignment(KafkaZkClient.scala:671)
> > at
> >
> >
> kafka.controller.KafkaController.initializePartitionReassignment(KafkaController.scala:698)
> > at
> >
> >
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:655)
> > at
> >
> >
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:232)
> > at
> >
> >
> kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1203)
> > at
> >
> >
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1479)
> > at
> >
> >
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> > at
> >
> >
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> > at
> >
> >
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> > at
> >
> >
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> > at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> > [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Resigning
> > (kafka.controller.KafkaController)
> > [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Unregister
> > BrokerModifications handler for Set(2, 1)
> > (kafka.controller.KafkaController)
> > [2022-05-21 09:07:48,360] INFO [PartitionStateMachine controllerId=2]
> > Stopped partition 

Re: Query regarding kafka controller shutdown

2022-06-05 Thread Liam Clarke-Hutchinson
Off the top of my head, it looks like it lost network connectivity to some
extent.

Question - what settings were used for topics like efGamePlay? What is min
insync replicas, replication factor, and what acks settings is the producer
using?

Cheers,

Liam

On Fri, 3 Jun 2022 at 22:55, dhiraj prajapati  wrote:

> Hi all,
> Recently we faced an issue with one of our production kafka clusters:
>  - It is a 3 node cluster
>  - kafka server version is 1.0
>
> *Issue*:
> One of the brokers had some problem resulting in the following:
> 1. The broker lost leadership of all of the topic-parttions
> 2. However the kafka server process did *NOT* get stopped/killed.
>
> I have attached the exception traces from controller and server logs at
> that time at the end of this email.
>
> *Questions:*
> 1. What could be the reason behind this happening?
> 2. How can I reproduce this exact scenario in our test environment?
>
> P.S. We did not see any GC logs, or general network blips at around that
> time.
>
> *Exception trace in controller log:*
>
> [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting
> (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting
> (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:48,354] ERROR [Controller id=2] Error while electing or
> becoming controller on broker 2 (kafka.controller.KafkaController)
> kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either
> before or while waiting for connection
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:233)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at
>
> kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:215)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at
>
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:214)
> at
>
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1461)
> at
>
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1437)
> at
> kafka.zk.KafkaZkClient.getPartitionReassignment(KafkaZkClient.scala:671)
> at
>
> kafka.controller.KafkaController.initializePartitionReassignment(KafkaController.scala:698)
> at
>
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:655)
> at
>
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:232)
> at
>
> kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1203)
> at
>
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1479)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Resigning
> (kafka.controller.KafkaController)
> [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Unregister
> BrokerModifications handler for Set(2, 1)
> (kafka.controller.KafkaController)
> [2022-05-21 09:07:48,360] INFO [PartitionStateMachine controllerId=2]
> Stopped partition state machine (kafka.controller.PartitionStateMachine)
> [2022-05-21 09:07:48,361] INFO [ReplicaStateMachine controllerId=2] Stopped
> replica state machine (kafka.controller.ReplicaStateMachine)
> [2022-05-21 09:07:48,361] INFO [RequestSendThread controllerId=2] Shutting
> down (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Stopped
> (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:48,364] INFO [RequestSendThread 

Query regarding kafka controller shutdown

2022-06-03 Thread dhiraj prajapati
Hi all,
Recently we faced an issue with one of our production kafka clusters:
 - It is a 3 node cluster
 - kafka server version is 1.0

*Issue*:
One of the brokers had some problem resulting in the following:
1. The broker lost leadership of all of the topic-parttions
2. However the kafka server process did *NOT* get stopped/killed.

I have attached the exception traces from controller and server logs at
that time at the end of this email.

*Questions:*
1. What could be the reason behind this happening?
2. How can I reproduce this exact scenario in our test environment?

P.S. We did not see any GC logs, or general network blips at around that
time.

*Exception trace in controller log:*

[2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting
(kafka.controller.RequestSendThread)
[2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting
(kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,354] ERROR [Controller id=2] Error while electing or
becoming controller on broker 2 (kafka.controller.KafkaController)
kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either
before or while waiting for connection
at
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:233)
at
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
at
kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at
kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221)
at
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:215)
at
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
at
kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
at
kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:214)
at
kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1461)
at
kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1437)
at
kafka.zk.KafkaZkClient.getPartitionReassignment(KafkaZkClient.scala:671)
at
kafka.controller.KafkaController.initializePartitionReassignment(KafkaController.scala:698)
at
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:655)
at
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:232)
at
kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1203)
at
kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1479)
at
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
at
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
at
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2022-05-21 09:07:48,358] DEBUG [Controller id=2] Resigning
(kafka.controller.KafkaController)
[2022-05-21 09:07:48,358] DEBUG [Controller id=2] Unregister
BrokerModifications handler for Set(2, 1) (kafka.controller.KafkaController)
[2022-05-21 09:07:48,360] INFO [PartitionStateMachine controllerId=2]
Stopped partition state machine (kafka.controller.PartitionStateMachine)
[2022-05-21 09:07:48,361] INFO [ReplicaStateMachine controllerId=2] Stopped
replica state machine (kafka.controller.ReplicaStateMachine)
[2022-05-21 09:07:48,361] INFO [RequestSendThread controllerId=2] Shutting
down (kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Stopped
(kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Shutdown
completed (kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Shutting
down (kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Stopped
(kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Shutdown
completed (kafka.controller.RequestSendThread)
[2022-05-21 09:07:48,365] INFO [Controller id=2] Resigned
(kafka.controller.KafkaController)


*Snippets 

Re: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Bruno Cadonna
Hi Georg,

Great that you could answer your own question and I am glad that I could help.

I was just writing you a similar answer. Yes, the global state store
will eventually reflect your write but you do not know when. That is
the main issue for your use case. A local state store will immediately
contain your previous write, because it is local to your processing.

For more information on the global state store see
https://kafka.apache.org/25/documentation/streams/developer-guide/dsl-api.html#streams_concepts_globalktable

Best,
Bruno

On Tue, May 19, 2020 at 3:04 PM Schmidt-Dumont Georg (BCI/ESW17)
 wrote:
>
> Hi Bruno,
>
> I just had a discussion with a colleague of mine regarding this and I wanted 
> to give you a quick contextual update. With regards to the global state, I 
> realize that having this state consistent in a distributed system is very 
> difficult. My expectation was that since it is a global state, Kafka takes 
> care of the consistency and I can just access the data. I think my 
> expectation was a bit naïve. The state will probably be eventually 
> consistent. But this does not fit with what I am trying to do. As you said I 
> should use a local store.
>
> With regards to the question in my previous mail with the amount of 
> partitions. I think I have answered my own question. Ensuring that the 
> messages have the correct and consistent keys will see to it that all the 
> data for a specific key ends up in a single partition. It does not mean that 
> a partition per key is required (which I first thought).
>
> Thanks again for your help!
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -Ursprüngliche Nachricht-
> Von: Bruno Cadonna 
> Gesendet: Dienstag, 19. Mai 2020 11:42
> An: Users 
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> local state stores in Kafka Streams are backed by a Kafka topic by default. 
> So, if the instance crashes the local state store is restored from the local 
> state directory. If the local state directory is empty or does not exist the 
> local state store is restored from the Kafka topic. Local state stores are as 
> resilient as global state stores.
>
> As far as I understand, you only look up previous records with the same key. 
> You do not need to have the global state available at each instance to do 
> this. Having available all records with the same key is sufficient. If your 
> input topic are partitioned by key then records with the same key will land 
> on the same instance. That means, your local state store contains all records 
> with the same key.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) 
>  wrote:
> >
> > Hi Bruno,
> >
> > Thanks for your quick reply!
> >
> > I decided to use a global state store for two reasons. If the application 
> > crashes, the store is populated properly once the reason for the crash has 
> > been fixed and the app starts again, i.e. I feel that it gives me a certain 
> > resiliency. Second we will be running multiple instances of the application 
> > and using a global state store provides the state across all instances.
> >
> > I am fairly new to Kafka and Kafka Streams, I am very much open to 
> > suggestions on better ways to handle the flow I need.
> >
> > Mit freundlichen Grüßen / Best regards
> >
> > Georg Schmidt-Dumont
> > BCI/ESW17
> > Bosch Connected Industry
> >
> > Tel. +49 711 811-49893
> >
> > ► Take a look: https://bgn.bosch.com/alias/bci
> >
> >
> >
> > -Ursprüngliche Nachricht-
> > Von: Bruno Cadonna 
> > Gesendet: Dienstag, 19. Mai 2020 10:52
> > An: Users 
> > Betreff: Re: Question regarding Kafka Streams Global State Store
> >
> > Hi Georg,
> >
> > From your description, I do not see why you need to use a global state 
> > instead of a local one. Are there any specific reasons for that? With a 
> > local state store you would have the previous record immediately available.
> >
> > Best,
> > Bruno
> >
> > On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
> >  wrote:
> > >
> > > Good morning,
> > >
> > > I have setup a Kafka Streams application with the following logic. The 
> > > incoming messages are validated and transformed. The transformed messages 
> > > are then published to a global state store via topic A as well as to an 
> > > additional t

AW: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Schmidt-Dumont Georg (BCI/ESW17)
Hi Bruno,

I just had a discussion with a colleague of mine regarding this and I wanted to 
give you a quick contextual update. With regards to the global state, I realize 
that having this state consistent in a distributed system is very difficult. My 
expectation was that since it is a global state, Kafka takes care of the 
consistency and I can just access the data. I think my expectation was a bit 
naïve. The state will probably be eventually consistent. But this does not fit 
with what I am trying to do. As you said I should use a local store.

With regards to the question in my previous mail with the amount of partitions. 
I think I have answered my own question. Ensuring that the messages have the 
correct and consistent keys will see to it that all the data for a specific key 
ends up in a single partition. It does not mean that a partition per key is 
required (which I first thought).

Thanks again for your help!

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-Ursprüngliche Nachricht-
Von: Bruno Cadonna  
Gesendet: Dienstag, 19. Mai 2020 11:42
An: Users 
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by default. So, 
if the instance crashes the local state store is restored from the local state 
directory. If the local state directory is empty or does not exist the local 
state store is restored from the Kafka topic. Local state stores are as 
resilient as global state stores.

As far as I understand, you only look up previous records with the same key. 
You do not need to have the global state available at each instance to do this. 
Having available all records with the same key is sufficient. If your input 
topic are partitioned by key then records with the same key will land on the 
same instance. That means, your local state store contains all records with the 
same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) 
 wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application 
> crashes, the store is populated properly once the reason for the crash has 
> been fixed and the app starts again, i.e. I feel that it gives me a certain 
> resiliency. Second we will be running multiple instances of the application 
> and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to 
> suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -Ursprüngliche Nachricht-
> Von: Bruno Cadonna 
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users 
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state 
> instead of a local one. Are there any specific reasons for that? With a local 
> state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
>  wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The 
> > incoming messages are validated and transformed. The transformed messages 
> > are then published to a global state store via topic A as well as to an 
> > additional topic A for consumption by other applications further down the 
> > processing pipeline.
> >
> > As part of the transformation I access the global state store in order to 
> > get the values from the previous message and use them in the transformation 
> > of the current message. The messages only contain changed values and these 
> > changes are merged with the complete data set before being sent on, hence I 
> > always hold the latest state in the global store in order to merge it with 
> > the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get 
> > the latest state. The update of the store takes too long so when I access 
> > it in the transformation I either get no values or values which do not 
> > represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier = 
> > Stores.persistentKeyValueStore( “global-store” ); final 
> > StoreBui

AW: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Schmidt-Dumont Georg (BCI/ESW17)
Hi Bruno,

The data I am processing comes from machines. The key is the identifier for the 
machine which produced a specific message. Currently we only have a couple of 
these machines producing data. This number will increase quite a lot over the 
coming years. Is the limit on the number of partitions per topic and cluster 
something we should/need to consider when using the keys as partitions?

Irrespective of whether I can use local state stores instead of a global state 
store, I would still like to understand the problem with using a global state 
store. It seems to me that I can only use it if the incoming message are far 
enough apart, time wise, to update the state in-between the processing of 
individual messages. This seems to be missing the point of having the store. 
Could you please explain what the purpose of the global state store is and when 
it to use it?

Thanks!

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-Ursprüngliche Nachricht-
Von: Bruno Cadonna  
Gesendet: Dienstag, 19. Mai 2020 11:42
An: Users 
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by default. So, 
if the instance crashes the local state store is restored from the local state 
directory. If the local state directory is empty or does not exist the local 
state store is restored from the Kafka topic. Local state stores are as 
resilient as global state stores.

As far as I understand, you only look up previous records with the same key. 
You do not need to have the global state available at each instance to do this. 
Having available all records with the same key is sufficient. If your input 
topic are partitioned by key then records with the same key will land on the 
same instance. That means, your local state store contains all records with the 
same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) 
 wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application 
> crashes, the store is populated properly once the reason for the crash has 
> been fixed and the app starts again, i.e. I feel that it gives me a certain 
> resiliency. Second we will be running multiple instances of the application 
> and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to 
> suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -Ursprüngliche Nachricht-
> Von: Bruno Cadonna 
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users 
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state 
> instead of a local one. Are there any specific reasons for that? With a local 
> state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
>  wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The 
> > incoming messages are validated and transformed. The transformed messages 
> > are then published to a global state store via topic A as well as to an 
> > additional topic A for consumption by other applications further down the 
> > processing pipeline.
> >
> > As part of the transformation I access the global state store in order to 
> > get the values from the previous message and use them in the transformation 
> > of the current message. The messages only contain changed values and these 
> > changes are merged with the complete data set before being sent on, hence I 
> > always hold the latest state in the global store in order to merge it with 
> > the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get 
> > the latest state. The update of the store takes too long so when I access 
> > it in the transformation I either get no values or values which do not 
> > represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier = 
> > Stores.persistentKeyValueStore( “global-store” ); final 
> > StoreBuilder> storeBuilder = 
> > Stores

Re: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Bruno Cadonna
Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by
default. So, if the instance crashes the local state store is restored
from the local state directory. If the local state directory is empty
or does not exist the local state store is restored from the Kafka
topic. Local state stores are as resilient as global state stores.

As far as I understand, you only look up previous records with the
same key. You do not need to have the global state available at each
instance to do this. Having available all records with the same key is
sufficient. If your input topic are partitioned by key then records
with the same key will land on the same instance. That means, your
local state store contains all records with the same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17)
 wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application 
> crashes, the store is populated properly once the reason for the crash has 
> been fixed and the app starts again, i.e. I feel that it gives me a certain 
> resiliency. Second we will be running multiple instances of the application 
> and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to 
> suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -Ursprüngliche Nachricht-
> Von: Bruno Cadonna 
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users 
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state 
> instead of a local one. Are there any specific reasons for that? With a local 
> state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
>  wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The 
> > incoming messages are validated and transformed. The transformed messages 
> > are then published to a global state store via topic A as well as to an 
> > additional topic A for consumption by other applications further down the 
> > processing pipeline.
> >
> > As part of the transformation I access the global state store in order to 
> > get the values from the previous message and use them in the transformation 
> > of the current message. The messages only contain changed values and these 
> > changes are merged with the complete data set before being sent on, hence I 
> > always hold the latest state in the global store in order to merge it with 
> > the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get 
> > the latest state. The update of the store takes too long so when I access 
> > it in the transformation I either get no values or values which do not 
> > represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier =
> > Stores.persistentKeyValueStore( “global-store” ); final
> > StoreBuilder> storeBuilder =
> > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder,
> > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > JSONObjectSerde() ), StoreProcessor::new );
> >
> > //store processor
> >
> > private KeyValueStore stateStore;
> >
> > @Override
> > public void init( final ProcessorContext context ) {
> >stateStore = (KeyValueStore)
> > context.getStateStore( “global-store” ); }
> >
> >
> >
> > @Override
> > public void process( final String key, final JSONObject state ) {
> >log.info( "Update state store for {}: {}.", key, state );
> >lastRecentStateStore.put( key, state ); }
> >
> >
> > //streams setup
> >
> > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> >
> > final KStream stream = builder.stream( “input-topic”, 
> > Consumed.with( Serdes.String(), jsonObjectSerde ) )
> >
> >.transformValues( ValueTransformer::new )
> >
> >
> >
> > stream.to( “global-store-topic”, Produced.valueSerde( jsonObje

AW: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Schmidt-Dumont Georg (BCI/ESW17)
Hi Bruno,

Thanks for your quick reply!

I decided to use a global state store for two reasons. If the application 
crashes, the store is populated properly once the reason for the crash has been 
fixed and the app starts again, i.e. I feel that it gives me a certain 
resiliency. Second we will be running multiple instances of the application and 
using a global state store provides the state across all instances.

I am fairly new to Kafka and Kafka Streams, I am very much open to suggestions 
on better ways to handle the flow I need.

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-Ursprüngliche Nachricht-
Von: Bruno Cadonna  
Gesendet: Dienstag, 19. Mai 2020 10:52
An: Users 
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

From your description, I do not see why you need to use a global state instead 
of a local one. Are there any specific reasons for that? With a local state 
store you would have the previous record immediately available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
 wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The 
> incoming messages are validated and transformed. The transformed messages are 
> then published to a global state store via topic A as well as to an 
> additional topic A for consumption by other applications further down the 
> processing pipeline.
>
> As part of the transformation I access the global state store in order to get 
> the values from the previous message and use them in the transformation of 
> the current message. The messages only contain changed values and these 
> changes are merged with the complete data set before being sent on, hence I 
> always hold the latest state in the global store in order to merge it with 
> the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the 
> latest state. The update of the store takes too long so when I access it in 
> the transformation I either get no values or values which do not represent 
> the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = 
> Stores.persistentKeyValueStore( “global-store” ); final 
> StoreBuilder> storeBuilder = 
> Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
> JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, 
> “global-store-topic”,  Consumed.with( Serdes.String(), new 
> JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>stateStore = (KeyValueStore) 
> context.getStateStore( “global-store” ); }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>log.info( "Update state store for {}: {}.", key, state );
>lastRecentStateStore.put( key, state ); }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream stream = builder.stream( “input-topic”, 
> Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>.transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>.orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 
> 0 and by disabling the cache on the store using “.withCachingDisabled()”. I 
> also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in 
> the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
> www.bosch.com<http://www.bosch.com/>
> Phone +49 711 811-49893  | 
> georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com>
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>


Re: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Bruno Cadonna
Hi Georg,

>From your description, I do not see why you need to use a global state
instead of a local one. Are there any specific reasons for that? With
a local state store you would have the previous record immediately
available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17)
 wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The 
> incoming messages are validated and transformed. The transformed messages are 
> then published to a global state store via topic A as well as to an 
> additional topic A for consumption by other applications further down the 
> processing pipeline.
>
> As part of the transformation I access the global state store in order to get 
> the values from the previous message and use them in the transformation of 
> the current message. The messages only contain changed values and these 
> changes are merged with the complete data set before being sent on, hence I 
> always hold the latest state in the global store in order to merge it with 
> the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the 
> latest state. The update of the store takes too long so when I access it in 
> the transformation I either get no values or values which do not represent 
> the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = 
> Stores.persistentKeyValueStore( “global-store” );
> final StoreBuilder> storeBuilder = 
> Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
> JSONObjectSerde() );
> builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( 
> Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>stateStore = (KeyValueStore) context.getStateStore( 
> “global-store” );
> }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>log.info( "Update state store for {}: {}.", key, state );
>lastRecentStateStore.put( key, state );
> }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream stream = builder.stream( “input-topic”, 
> Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>.transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>.orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 
> 0 and by disabling the cache on the store using “.withCachingDisabled()”. I 
> also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in 
> the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
> www.bosch.com
> Phone +49 711 811-49893  | 
> georg.schmidt-dum...@bosch.com
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>


Question regarding Kafka Streams Global State Store

2020-05-19 Thread Schmidt-Dumont Georg (BCI/ESW17)
Good morning,

I have setup a Kafka Streams application with the following logic. The incoming 
messages are validated and transformed. The transformed messages are then 
published to a global state store via topic A as well as to an additional topic 
A for consumption by other applications further down the processing pipeline.

As part of the transformation I access the global state store in order to get 
the values from the previous message and use them in the transformation of the 
current message. The messages only contain changed values and these changes are 
merged with the complete data set before being sent on, hence I always hold the 
latest state in the global store in order to merge it with the incoming changed 
values.

Unfortunately, when I access the store in the transformation I do not get the 
latest state. The update of the store takes too long so when I access it in the 
transformation I either get no values or values which do not represent the 
latest state.

The following shows the build-up of my streams app:

//setup global state store
final KeyValueBytesStoreSupplier storeSupplier = 
Stores.persistentKeyValueStore( “global-store” );
final StoreBuilder> storeBuilder = 
Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
JSONObjectSerde() );
builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( 
Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );

//store processor

private KeyValueStore stateStore;

@Override
public void init( final ProcessorContext context ) {
   stateStore = (KeyValueStore) context.getStateStore( 
“global-store” );
}



@Override
public void process( final String key, final JSONObject state ) {
   log.info( "Update state store for {}: {}.", key, state );
   lastRecentStateStore.put( key, state );
}


//streams setup

final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();

final KStream stream = builder.stream( “input-topic”, 
Consumed.with( Serdes.String(), jsonObjectSerde ) )

   .transformValues( ValueTransformer::new )



stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );

stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );

//global state store access in ValueTransformer

JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
   .orElse( new JSONObject() );


I have set the acknowledge property for the producers to “all”.

I have tried to disable the caching by setting “cache.max.bytes.buffering” to 0 
and by disabling the cache on the store using “.withCachingDisabled()”. I also 
tried setting the commit interval to 0. All without success.

How can I setup a global state which meets the requirements as describe in the 
scenario above?

Thank you!

Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候

Mr. Georg Schmidt-Dumont
Bosch Connected Industry – BCI/ESW17
Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
www.bosch.com
Phone +49 711 811-49893  | 
georg.schmidt-dum...@bosch.com

Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
Denner,
Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
Markus Heyn, Dr. Dirk Hoheisel,
Christoph Kübel, Uwe Raschke, Peter Tyroller



Issue Regarding Kafka-log size. In Windows Operating system

2020-02-17 Thread Savan Tripathi
Dear sir,
I am new to Apache Kafka. I am developing an application in which Apache
kafka is working as broker. I am producing random data using Spring Boot
application and inserting it to PostgreSQL's TimeScale Database using
Telegraf plugin and Kafka broker.

While I am running my application and producing random data, operations are
getting successful but at the same time Kafka-log size is getting increased
in GigaByte and my Disk is getting full within few minutes.

For example in 30-40 minutes, 35gb of size is occupied by logs.

So i am requesting you to suggest some solution for the same as soon as
possible

-- 

*Thanks and Regards,*


*Savan Tripathi,*

*Software Developer,*

*for Silcore Technology,*
*Tel. +919687661608.*


Regarding kafka memory not distributed across cluster

2019-05-07 Thread VIPUL MALOO
Hi all,

We are having a query regarding memory consumption on kafka scale out. It would 
be very helpful if you can give suggestion/solution for the below query.
We are running kafka as docker container on kubernetes.

Memory limit of 4GiB is configured for Kafka broker POD. With some large load 
Kafka broker POD's memory reached 4GiB. So we decided to manually scale out 
Kafka broker POD replicas from 1 to 3. But after scale out, for same load each 
Kafka broker PODs are consuming 4GiB memory.  We expected Kafka broker POD's 
memory consumption to ~1.33GiB as we are running 3 PODs for same amount of load.

Before Kafka Broker Scale out :
1 broker
6 topics each with 1 partition each
Memory consumption: 4GiB

After Kafka Broker Scale out and rebalancing topics over all the brokers:
3 broker
6 topics each with 1 partitions each
Memory consumption: 10GiB (Pod1: 2GiB, Pod2: 4GiB, Pod3: 4GiB)


After Kafka Broker Scale out and rebalancing topics over all the brokers:
3 broker
6 topics each with 3 partitions each
Memory consumption: 12GiB (Pod1: 4GiB, Pod2: 4GiB, Pod3: 4GiB)

All deployments are tested with same amount of load.

Regards,
Vipul Maloo
Micro Focus





Question regarding Kafka 2.0

2019-05-03 Thread Sourabh S P
Hi,
  Prior I was using Kafka version 1.1.1 and currently we are planning on
migrating to version 2.0.0. But I am facing issues as there are lot of
classes which has been removed.

 kafka.api.TopicMetadata;

 kafka.client.ClientUtils;



 kafka.consumer.ConsumerConfig;

 kafka.consumer.SimpleConsumer;

 kafka.message.ByteBufferMessageSet;

 kafka.message.InvalidMessageException;

 kafka.message.MessageAndOffset;

 kafka.common.ErrorMapping;



 kafka.message.MessageAndMetadata;

 kafka.utils.ZKGroupTopicDirs;

 kafka.consumer.ConsumerConfig;

 kafka.consumer.FetchedDataChunk;

 kafka.consumer.KafkaStream;

 kafka.consumer.FetchedDataChunk;


Could you please confirm have the following classes have been removed
completely or have been moved to a different package.Please suggest if
there are any alternative classes I could use.


Your help is much appreciated.


Looking forward to hearing from you.


BR

Sourabh S P


Regarding Kafka Partitions, it's division and how to Push messages into it.

2019-03-05 Thread anjaliupadhyay3
Hi,
i am trying  to create "n" numbers of partitions for a single broker and those 
should be keyed partitions. I am able to push message in my first 
keyed-partition by taking ((partition-size) - 1) through custom-partitioner 
class, so in this case the first-keyed partition will the be last partition 
form the topic(suppose i have 20 partitions the the first-keyed partition will 
be 19th one).But my concern is how do i divide other partitions and push 
messages in each partition  through custom-partitioner class.

from 
Anjali Upadhyay 


Regarding Kafka

2018-08-02 Thread Naresh Kumar Reddy
Hello,

I'm unable to start the zookeeper as well as Kafka. I tried by downloading the 
zookeeper separately & now I'm able to start the zookeeper.
Coming to Kafka-server, It throws an error message "wmic is not recognized as 
an internal or external command", I resolved it by adding the 
"C:\Windows\System32\wbem" to path in system variables.
Now when I try to run kafka-server, Neither the kafka-server starts Nor throws 
any error message.

Steps followed:

  1.  Downloaded the zookeeper & Kafka-server from 
http://kafka.apache.org/downloads (Binary downloads)
  2.  Extracted the file using "tar" command
  3.  From the cmd try to run the zookeeper, Which worked fine.
  4.  From another cmd try to run kafka-server, No error message (Find the 
attachement)

Is there anything that I'm not following correctly.

Thanks,
Naresh



Re: Information regarding Kafka 2.10-0.10.1.1 version

2018-02-10 Thread Ted Yu
For #1, there is record-size-avg metric

Not sure about #2

On Thu, Feb 8, 2018 at 10:28 AM, Pawan K  wrote:

> Hi,
> I am currently trying to research answers for the following questions. Can
> you please let me know where/how could i find these in the configuration.
>
> 1) Average record size (KB) for data written to each kafka topic
> 2) Average number of events written to each kafka topic per day
>
>
> Thanks,
> Pavan
>


Information regarding Kafka 2.10-0.10.1.1 version

2018-02-09 Thread Pawan K
Hi,
I am currently trying to research answers for the following questions. Can
you please let me know where/how could i find these in the configuration.

1) Average record size (KB) for data written to each kafka topic
2) Average number of events written to each kafka topic per day


Thanks,
Pavan


Re: Regarding Kafka Consumer

2017-11-24 Thread simarpreet kaur
Thanks, Faraz.

I am using its Java API. It does not seem to provide such method to the
consumer.

On Wed, Nov 22, 2017 at 2:45 PM, Faraz Mateen  wrote:

> Not sure which client you are using.
> In kafka-python, consumer.config returns a dictionary with all consumer
> properties.
>
> Thanks,
> Faraz
>
> On Mon, Nov 20, 2017 at 5:34 PM, simarpreet kaur 
> wrote:
>
>> Hello team,
>>
>> I wanted to know if there is some way I can retrieve consumer properties
>> from the Kafka Consumer. for example, if at runtime, I want to know the
>> group id of a particular consumer, in case multiple consumers are running
>> in my application.
>>
>> Thanks & Regards,
>> Simarpreet
>>
>
>


Regarding Kafka Consumer

2017-11-20 Thread simarpreet kaur
Hello team,

I wanted to know if there is some way I can retrieve consumer properties
from the Kafka Consumer. for example, if at runtime, I want to know the
group id of a particular consumer, in case multiple consumers are running
in my application.

Thanks & Regards,
Simarpreet


Re: Regarding kafka-manager topics parameters

2017-10-18 Thread Ted Yu
Images didn't come thru.

Consider using third party website.

On Tue, Oct 17, 2017 at 9:36 PM, Pavan Patani 
wrote:

> Hello,
>
> Previously I was using old version of Kafka-manager and it was showing
> "Producer Message/Sec and Summed Recent Offsets" parameters in topics as
> below.
>
> [image: Inline image 1]
> Currently I have installed  kafka-manager-1.3.3.14 and now I can not see
> these two "Producer Message/Sec and Summed Recent Offsets" parameters  in
> topics as below.
>
> [image: Inline image 2]
>
> Could you please guide me to add these two columns.
>
> Regards,
> Pavan Patani
>
>
>


Regarding kafka-manager topics parameters

2017-10-18 Thread Pavan Patani
Hello,

Previously I was using old version of Kafka-manager and it was showing
"Producer Message/Sec and Summed Recent Offsets" parameters in topics as
below.

[image: Inline image 1]
Currently I have installed  kafka-manager-1.3.3.14 and now I can not see
these two "Producer Message/Sec and Summed Recent Offsets" parameters  in
topics as below.

[image: Inline image 2]

Could you please guide me to add these two columns.

Regards,
Pavan Patani


Info regarding kafka topic

2017-06-08 Thread BigData dev
Hi,
I have a 3 node Kafka Broker cluster.
I have created a topic and the leader for the topic is broker 1(1001). And
the broker got died.
But when I see the information in zookeeper for the topic, I see the leader
is still set to broker 1 (1001) and isr is set to 1001. Is this a bug in
kafka, as now leader is died, the leader should have set to none.

*[zk: localhost:2181(CONNECTED) 7] get
/brokers/topics/t3/partitions/0/state*

*{"controller_epoch":1,"leader":1001,"version":1,"leader_epoch":1,"isr":[1001]}*

*cZxid = 0x10078*

*ctime = Thu Jun 08 14:50:07 PDT 2017*

*mZxid = 0x1008c*

*mtime = Thu Jun 08 14:51:09 PDT 2017*

*pZxid = 0x10078*

*cversion = 0*

*dataVersion = 1*

*aclVersion = 0*

*ephemeralOwner = 0x0*

*dataLength = 78*

*numChildren = 0*

*[zk: localhost:2181(CONNECTED) 8] *


And when I use describe command the output is

*[root@meets2 kafka-broker]# bin/kafka-topics.sh --describe --topic t3
--zookeeper localhost:2181*

*Topic:t3 PartitionCount:1 ReplicationFactor:2 Configs:*

*Topic: t3 Partition: 0 Leader: 1001 Replicas: 1001,1003 Isr: 1001*


When I use unavailable-partition option, I can know correctly.

*[root@meets2 kafka-broker]# bin/kafka-topics.sh --describe --topic t3
--zookeeper localhost:2181 --unavailable-partitions*

* Topic: t3 Partition: 0 Leader: 1001 Replicas: 1001,1003 Isr: 1001*


But in zookeeper topic state, the leader should have been set to none, not
the actual leader when the broker has died. Is this according to design or
is it a bug in Kafka. Could you please provide any information on this?


*Thanks,*

*Bharat*


Re: Queries regarding kafka Monitoring tool burrow.

2017-05-29 Thread Todd Palino
The lag numbers are never going to be exactly the same as what the CLI tool
returns, as the broker is queried on an interval for the offset at the end
of each partition. As far as crashing goes, I’d be interested to hear about
specifics as we run it (obviously) and don’t have that problem. It could be
environmental differences or other problems that we’re not running into. I
regret that I’m not able to be as active on the GitHub issues as I would
like to be, but I do try and get through them (and others will answer
questions as well).

Abhimanyu, to answer your questions…

1 - When complete is false, this means that Burrow does not yet have enough
information on the consumer group to work. There’s a configurable number of
intervals, or offset commits, that Burrow requires for each partition. So
if your consumer group commits offsets every minute, it will take 10
minutes for there to be enough offset commits (for the default interval
config of 10). If you commit offsets every 10 minutes, it will take 1 hour
and 40 minutes.

2 - We haven’t included any visualization tools with Burrow directly, but
some other people have been working on add-ons. Check out the associated
projects - https://github.com/linkedin/Burrow/wiki/Associated-Projects

For our own use, we actual collect the metrics from Burrow (partition
counts for the error stats, total lag) into our internal metrics system
that does graphing. I know some others are doing that as well.

3 - As noted above, because of the way Burrow gets the broker log end
offsets, the numbers won’t match the CLI exactly. In addition, Burrow
currently only calculates lag for a given partition when the consumer
commits an offset. The reasoning for this is that the lag numbers were not
really designed to be exposed in the original design - we wanted to create
overall status for each consumer group, not specific lag metrics. We used
to do the latter, using the CLI tool and some wrappers around it, similar
to what Ian has described with remora, and found it significantly lacking.
Specifically, we had too many lag numbers to deal with for thousands of
topics and tens of thousands of partitions over many consumers, and no good
way to define thresholds.

-Todd




On Mon, May 29, 2017 at 3:51 PM, Ian Duffy  wrote:

> Hey Abhimanyu,
>
> Not directly answering your questions but in the past we used burrow at my
> current company and we had a horrible time with it. It would crash daily
> and its lag metrics were very different to what was returned when you would
> run the kafka-consumer-group describe command as you noted.
>
> My co-worker ended up building our own solution that basically just wraps
> around the command line tools. https://github.com/zalando-incubator/remora
>
> > 2. Since burrow returns JSON is there any visualization tools that can
> be used
> to monitor the end results.
>
> We've an monitoring solution (https://github.com/zalando/zmon) that polls
> the HTTP endpoint every 60 seconds and places the data into kariosdb. From
> there we've a time series db to query directly from grafana. It should be
> possible to throw a simple poller script together that does this for you.
>
> > 3. On hitting group describe command and burrow group lag command
> results are
> different burrow result is somewhat delayed then the results that I am
> getting
> while hitting group describes command on Kafka broker and I am getting the
> different result.
>
> They use a different lag calculation method
> https://github.com/linkedin/Burrow/wiki/Consumer-Lag-Evaluation-Rules
> describes it.
>
> On 28 May 2017 at 12:59, Abhimanyu Nagrath 
> wrote:
>
> > Hi ,
> >
> > I am using burrow to monitor kafka Lags and I am having following
> queries :
> >
> > 1.On hitting the API /v2/kafka/local/consumer/group1/lag I am not able
> to
> > view all the topics details present in that group and getting complete:
> > false in the above JSON. What does this mean? Below mentioned is the json
> > result of the above query.
> > {
> >
> > "error": false,
> > "message": "consumer group status returned",
> > "status": {
> > "cluster": "local",
> > "group": "group1",
> > "status": "OK",
> > "complete": false,
> > "partitions": [
> > {
> > "topic": "topic1",
> > "partition": 1,
> > "status": "OK",
> > "start": {
> > "offset": 144,
> > "timestamp": 1494566913489,
> > "lag": 0,
> > "max_offset": 144
> > },
> > "end": {
> > "offset": 144,
> > "timestamp": 1494566999000,
> > "lag": 0,
> > "max_offset": 144
> > }
> > }
> > ],
> > "partition_count": 17,
> > "maxlag": null,
> > "totallag": 0
> > },
> > "request": {
> > "url": "/v2/kafka/local/consumer/group1/lag",
> > "host": "",
> > "cluster": "local",
> > "group": "group1",
> > "topic": ""
> > }
> > }
> >
> >
> > 2. Since burrow returns JSON is there any visualization tools that can be
> > used to monitor the end results.
> >
> > 3. On hitting group describe command and burrow group lag command results
> > are 

Re: Queries regarding kafka Monitoring tool burrow.

2017-05-29 Thread Ian Duffy
Hey Abhimanyu,

Not directly answering your questions but in the past we used burrow at my
current company and we had a horrible time with it. It would crash daily
and its lag metrics were very different to what was returned when you would
run the kafka-consumer-group describe command as you noted.

My co-worker ended up building our own solution that basically just wraps
around the command line tools. https://github.com/zalando-incubator/remora

> 2. Since burrow returns JSON is there any visualization tools that can be used
to monitor the end results.

We've an monitoring solution (https://github.com/zalando/zmon) that polls
the HTTP endpoint every 60 seconds and places the data into kariosdb. From
there we've a time series db to query directly from grafana. It should be
possible to throw a simple poller script together that does this for you.

> 3. On hitting group describe command and burrow group lag command results are
different burrow result is somewhat delayed then the results that I am getting
while hitting group describes command on Kafka broker and I am getting the
different result.

They use a different lag calculation method
https://github.com/linkedin/Burrow/wiki/Consumer-Lag-Evaluation-Rules
describes it.

On 28 May 2017 at 12:59, Abhimanyu Nagrath 
wrote:

> Hi ,
>
> I am using burrow to monitor kafka Lags and I am having following queries :
>
> 1.On hitting the API /v2/kafka/local/consumer/group1/lag I am not able to
> view all the topics details present in that group and getting complete:
> false in the above JSON. What does this mean? Below mentioned is the json
> result of the above query.
> {
>
> "error": false,
> "message": "consumer group status returned",
> "status": {
> "cluster": "local",
> "group": "group1",
> "status": "OK",
> "complete": false,
> "partitions": [
> {
> "topic": "topic1",
> "partition": 1,
> "status": "OK",
> "start": {
> "offset": 144,
> "timestamp": 1494566913489,
> "lag": 0,
> "max_offset": 144
> },
> "end": {
> "offset": 144,
> "timestamp": 1494566999000,
> "lag": 0,
> "max_offset": 144
> }
> }
> ],
> "partition_count": 17,
> "maxlag": null,
> "totallag": 0
> },
> "request": {
> "url": "/v2/kafka/local/consumer/group1/lag",
> "host": "",
> "cluster": "local",
> "group": "group1",
> "topic": ""
> }
> }
>
>
> 2. Since burrow returns JSON is there any visualization tools that can be
> used to monitor the end results.
>
> 3. On hitting group describe command and burrow group lag command results
> are different burrow result is somewhat delayed then the results that I am
> getting while hitting group describes command on Kafka broker and I am
> getting the different result.
>
>
>
> Below mentioned is my burrow configuration:
>
>
> [general]
> logdir=log
> logconfig=/root/go/src/github.com/linkedin/Burrow/config/logging.cfg
> pidfile=burrow.pid
> client-id=burrow-lagchecker
> group-blacklist=^(console-consumer-|python-kafka-consumer-).*$
> #group-whitelist=^(my-important-consumer).*$
>
> [zookeeper]
> hostname=
> port=2181
> timeout=6
> lock-path=/burrow/notifier
>
> [kafka "local"]
> broker=
> broker-port=9092
> zookeeper=
> zookeeper-port=2181
> zookeeper-path=/
> offsets-topic=__consumer_offsets
>
> #[storm "local"]
> #zookeeper=zkhost01.example.com
> #zookeeper-port=2181
> #zookeeper-path=/kafka-cluster/stormconsumers
>
> [tickers]
> broker-offsets=20
>
> [lagcheck]
> intervals=10
> expire-group=604800
>
> [notify]
> interval=10
>
> [httpserver]
> server=on
> port=8000
> ; Alternatively, use listen (cannot be specified when port is)
> ; listen=host:port
> ; listen=host2:port2
>
> [smtp]
> server=mailserver.example.com
> port=25
> from=burrow-nore...@example.com
> template=config/default-email.tmpl
>
> [emailnotifier "b...@example.com"]
> group=local,critical-consumer-group
> group=local,other-consumer-group
> interval=60
>
> [notify]
> interval=10
>
> [httpnotifier]
> url=http://notification.server.example.com:9000/v1/alert
> interval=60
> extra=app=burrow
> extra=tier=STG
> template-post=config/default-http-post.tmpl
> template-delete=config/default-http-delete.tmpl
> timeout=5
> keepalive=30
>
> So Can you please let me know what I am missing and how to fix these
> issues.Any help would be appreciated.
>
>
>
> Regards,
> Abhimanyu
>


Queries regarding kafka Monitoring tool burrow.

2017-05-28 Thread Abhimanyu Nagrath
Hi ,

I am using burrow to monitor kafka Lags and I am having following queries :

1.On hitting the API /v2/kafka/local/consumer/group1/lag I am not able to
view all the topics details present in that group and getting complete:
false in the above JSON. What does this mean? Below mentioned is the json
result of the above query.
{

"error": false,
"message": "consumer group status returned",
"status": {
"cluster": "local",
"group": "group1",
"status": "OK",
"complete": false,
"partitions": [
{
"topic": "topic1",
"partition": 1,
"status": "OK",
"start": {
"offset": 144,
"timestamp": 1494566913489,
"lag": 0,
"max_offset": 144
},
"end": {
"offset": 144,
"timestamp": 1494566999000,
"lag": 0,
"max_offset": 144
}
}
],
"partition_count": 17,
"maxlag": null,
"totallag": 0
},
"request": {
"url": "/v2/kafka/local/consumer/group1/lag",
"host": "",
"cluster": "local",
"group": "group1",
"topic": ""
}
}


2. Since burrow returns JSON is there any visualization tools that can be
used to monitor the end results.

3. On hitting group describe command and burrow group lag command results
are different burrow result is somewhat delayed then the results that I am
getting while hitting group describes command on Kafka broker and I am
getting the different result.



Below mentioned is my burrow configuration:


[general]
logdir=log
logconfig=/root/go/src/github.com/linkedin/Burrow/config/logging.cfg
pidfile=burrow.pid
client-id=burrow-lagchecker
group-blacklist=^(console-consumer-|python-kafka-consumer-).*$
#group-whitelist=^(my-important-consumer).*$

[zookeeper]
hostname=
port=2181
timeout=6
lock-path=/burrow/notifier

[kafka "local"]
broker=
broker-port=9092
zookeeper=
zookeeper-port=2181
zookeeper-path=/
offsets-topic=__consumer_offsets

#[storm "local"]
#zookeeper=zkhost01.example.com
#zookeeper-port=2181
#zookeeper-path=/kafka-cluster/stormconsumers

[tickers]
broker-offsets=20

[lagcheck]
intervals=10
expire-group=604800

[notify]
interval=10

[httpserver]
server=on
port=8000
; Alternatively, use listen (cannot be specified when port is)
; listen=host:port
; listen=host2:port2

[smtp]
server=mailserver.example.com
port=25
from=burrow-nore...@example.com
template=config/default-email.tmpl

[emailnotifier "b...@example.com"]
group=local,critical-consumer-group
group=local,other-consumer-group
interval=60

[notify]
interval=10

[httpnotifier]
url=http://notification.server.example.com:9000/v1/alert
interval=60
extra=app=burrow
extra=tier=STG
template-post=config/default-http-post.tmpl
template-delete=config/default-http-delete.tmpl
timeout=5
keepalive=30

So Can you please let me know what I am missing and how to fix these
issues.Any help would be appreciated.



Regards,
Abhimanyu


Regarding Kafka Audit tool for auditing Kafka Data

2017-04-04 Thread Vidya Priyadarshni Narayan
Hi,

I am a product engineer at Go-Jek and we are using Kafka in our Data
Engineering team. I came across this slide share presentation on
https://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015

I would like to know if this Kafka Audit tool is open sourced as well / get
some solutions of Kafka data audit tools by LinkedIn.

-- 

Thanks
nVidia


Weird message regarding kafka Cluster

2016-12-13 Thread Chieh-Chun Chang
Hello Sir:

My name is Chieh-Chun Chang and and we have a problem about our Kafka prod
cluster.


Kafka client version
kafka-clients-0.9.0-kafka-2.0.0.jar
Kafka version
kafka_2.10-0.9.0-kafka-2.0.0.jar

Our kafka broker cluster is experiencing  under replicated partitions
problems and I found out this information
2016-12-13 07:20:24,393 DEBUG org.apache.kafka.common.network.Selector:
Connection with /10.1.205.75 disconnected
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at kafka.network.Processor.poll(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:412)
at java.lang.Thread.run(Thread.java:745)
And this is source code



the ip address(10.1.205.75 ) might be random host macihne, might be yarn
cluster machine or one producer machine and it seems relating  to under
replicated partitions problems regarding timestamp.

This is my speculation.
I did research on this and it seems that 0.9.0 will not keep alive socket
connection so this might happen.

but it seems that once it happens, it will delay yarn cluster execution
time so i am wondering if this is a expected behavior.

Would you mind commenting on this  why this might happen?

Thank you very much,
Chieh-Chun Chang


Regarding Kafka server integration with Kerberos

2016-11-16 Thread Mishra, Harishankar
Hi,

I am using Kafka Server kafka_2.10-0.10.1.0.
I am trying to integrate it with Kerberos server.

I followed steps mentioned at 
http://kafka.apache.org/documentation.html#security_sasl

However I am not able to start the server and getting the following error.

[2016-11-16 09:47:46,578] INFO Initiating client connection, 
connectString=localhost:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@20deea7f (org.apache.zookeeper.ZooKeeper)
[2016-11-16 09:47:46,594] INFO Waiting for keeper state SaslAuthenticated 
(org.I0Itec.zkclient.ZkClient)
>>>KinitOptions cache name is /tmp/krb5cc_0
>>>DEBUG   client principal is 
>>>kafka/sdld0688.labs.teradata@krb5dom.tdat
>>>DEBUG  server principal is 
>>>krbtgt/krb5dom.t...@krb5dom.tdat
>>>DEBUG  key type: 23
>>>DEBUG  auth time: Wed Nov 16 06:59:17 PST 2016
>>>DEBUG  start time: Wed Nov 16 06:58:06 PST 2016
>>>DEBUG  end time: Wed Nov 16 16:59:17 PST 2016
>>>DEBUG  renew_till time: Thu Nov 17 06:58:06 PST 2016
>>> CCacheInputStream: readFlags()  RENEWABLE; INITIAL; PRE_AUTH;
Java config name: /etc/kafka/krb5.conf
Loaded from Java config
[2016-11-16 09:47:46,728] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No key to store Will continue 
connection to Zookeeper server without SASL authentication, if Zookeeper server 
allows it. (org.apache.zookeeper.ClientCnxn)
[2016-11-16 09:47:46,734] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)

kinit is working fine with the keytab.
Any help would be appreciated in resolving the issue.

Thanks
Harishankar Mishra



Re: Regarding Kafka

2016-10-09 Thread Abhit Kalsotra
Yeah that I realized and later read that in Java Kafka consumer there is
one thread, that's why such behavior does not arise there. May be if I need
to restrict my application to a single threaded :( in order to achieve
that.. I need to ask Magnus Edenhill who is librdkafka expert..
Thanks for your time Hans

Abhi

On Sun, Oct 9, 2016 at 10:08 PM, Hans Jespersen <h...@confluent.io> wrote:

> I'm pretty sure Jun was talking about the Java API in the quoted blog
> text, not librdkafka. There is only one thread in the new Java consumer so
> you wouldn't see this behavior. I do not think that librdkafka makes any
> such guarantee to dispatch unique keys to each thread but I'm not an expert
> in librdkafka so others may be about to help you better on that.
> //h...@confluent.io
>  Original message From: Abhit Kalsotra <abhit...@gmail.com>
> Date: 10/9/16  3:58 AM  (GMT-08:00) To: users@kafka.apache.org Subject:
> Re: Regarding Kafka
> I did that but i am getting confusing results
>
> e.g
>
> I have created 4 Kafka Consumer threads for doing data analytic, these
> threads just wait for Kafka messages to get consumed and
> I have provided the key provided when I produce, it means that all the
> messages will go to one single partition ref "
> http://www.confluent.io/blog/how-to-choose-the-number-of-
> topicspartitions-in-a-kafka-cluster/
> "
> "* On the consumer side, Kafka always gives a single partition’s data to
> one consumer thread.*"
>
> If you see my application logs, my 4 Kafka Consumer Application threads
> which are calling consume() , Arn't all message of a particular ID should
> be consumed by one Kafka Application thread ?
>
> [2016-10-08 23:37:07.498]AxThreadId 23516 ->ID:4495 offset: 74 ][ID ID
> date:2016-09-28 20:07:32.000 ]
> [2016-10-08 23:37:07.498]AxThreadId 2208 ->ID:4496 offset: 80 ][ID ID
> date: 2016-09-28 20:07:39.000 ]
> [2016-10-08 23:37:07.498]AxThreadId 2208 ->ID:4495 offset: 77 ][ID
> date: 2016-09-28 20:07:35.000 ]
> [2016-10-08 23:37:07.498]AxThreadId 23516 ->ID:4495 offset: 76][ID
> date: 2016-09-28 20:07:34.000 ]
> [2016-10-08 23:37:07.498]AxThreadId 9540 ->ID:4495 offset: 75 ][ID
> date: 2016-09-28 20:07:33.000 ]
> [2016-10-08 23:37:07.499]AxThreadId 23516 ->ID:4495 offset: 78 ][ID
> date: 2016-09-28 20:07:36.000 ]
> [2016-10-08 23:37:07.499]AxThreadId 2208 ->ID:4495 offset: 79 ][ID
> date: 2016-09-28 20:07:37.000 ]
> [2016-10-08 23:37:07.499]AxThreadId 9540 ->ID:4495 offset: 80 ][ID
> date: 2016-09-28 20:07:38.000 ]
> [2016-10-08 23:37:07.500]AxThreadId 23516 ->ID:4495 offset: 81][ID
> date: 2016-09-28 20:07:39.000 ]
>
>
>
>
> On Sun, Oct 9, 2016 at 1:31 PM, Hans Jespersen <h...@confluent.io> wrote:
>
> > Then publish with the user ID as the key and all messages for the same
> key
> > will be guaranteed to go to the same partition and therefore be in order
> > for whichever consumer gets that partition.
> >
> >
> > //h...@confluent.io
> >  Original message From: Abhit Kalsotra <
> abhit...@gmail.com>
> > Date: 10/9/16  12:39 AM  (GMT-08:00) To: users@kafka.apache.org Subject:
> > Re: Regarding Kafka
> > What about the order of message getting received ? If i don't mention the
> > partition.
> >
> > Lets say if i have user ID :4456 and I have to do some analytics at the
> > Kafka Consumer end and at my consumer end if its not getting consumed the
> > way I sent, then my analytics will go haywire.
> >
> > Abhi
> >
> > On Sun, Oct 9, 2016 at 12:50 PM, Hans Jespersen <h...@confluent.io>
> wrote:
> >
> > > You don't even have to do that because the default partitioner will
> > spread
> > > the data you publish to the topic over the available partitions for
> you.
> > > Just try it out to see. Publish multiple messages to the topic without
> > > using keys, and without specifying a partition, and observe that they
> are
> > > automatically distributed out over the available partitions.
> > >
> > >
> > > //h...@confluent.io
> > >  Original message From: Abhit Kalsotra <
> > abhit...@gmail.com>
> > > Date: 10/8/16  11:19 PM  (GMT-08:00) To: users@kafka.apache.org
> Subject:
> > > Re: Regarding Kafka
> > > Hans
> > >
> > > Thanks for the response, yeah you can say yeah I am treating topics
> like
> > > partitions, because my
> > >
> > > current logic of producing to a respective topic goes something like
> this
> > >
> > > RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_
&

Re: Regarding Kafka

2016-10-09 Thread Hans Jespersen
I'm pretty sure Jun was talking about the Java API in the quoted blog text, not 
librdkafka. There is only one thread in the new Java consumer so you wouldn't 
see this behavior. I do not think that librdkafka makes any such guarantee to 
dispatch unique keys to each thread but I'm not an expert in librdkafka so 
others may be about to help you better on that. 
//h...@confluent.io
 Original message From: Abhit Kalsotra <abhit...@gmail.com> 
Date: 10/9/16  3:58 AM  (GMT-08:00) To: users@kafka.apache.org Subject: Re: 
Regarding Kafka 
I did that but i am getting confusing results

e.g

I have created 4 Kafka Consumer threads for doing data analytic, these
threads just wait for Kafka messages to get consumed and
I have provided the key provided when I produce, it means that all the
messages will go to one single partition ref "
http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
"
"* On the consumer side, Kafka always gives a single partition’s data to
one consumer thread.*"

If you see my application logs, my 4 Kafka Consumer Application threads
which are calling consume() , Arn't all message of a particular ID should
be consumed by one Kafka Application thread ?

[2016-10-08 23:37:07.498]AxThreadId 23516 ->ID:4495 offset: 74 ][ID ID
date:2016-09-28 20:07:32.000 ]
[2016-10-08 23:37:07.498]AxThreadId 2208 ->ID:4496 offset: 80 ][ID ID
date: 2016-09-28 20:07:39.000 ]
[2016-10-08 23:37:07.498]AxThreadId 2208 ->ID:4495 offset: 77 ][ID
date: 2016-09-28 20:07:35.000 ]
[2016-10-08 23:37:07.498]AxThreadId 23516 ->ID:4495 offset: 76][ID
date: 2016-09-28 20:07:34.000 ]
[2016-10-08 23:37:07.498]AxThreadId 9540 ->ID:4495 offset: 75 ][ID
date: 2016-09-28 20:07:33.000 ]
[2016-10-08 23:37:07.499]AxThreadId 23516 ->ID:4495 offset: 78 ][ID
date: 2016-09-28 20:07:36.000 ]
[2016-10-08 23:37:07.499]AxThreadId 2208 ->ID:4495 offset: 79 ][ID
date: 2016-09-28 20:07:37.000 ]
[2016-10-08 23:37:07.499]AxThreadId 9540 ->ID:4495 offset: 80 ][ID
date: 2016-09-28 20:07:38.000 ]
[2016-10-08 23:37:07.500]AxThreadId 23516 ->ID:4495 offset: 81][ID
date: 2016-09-28 20:07:39.000 ]




On Sun, Oct 9, 2016 at 1:31 PM, Hans Jespersen <h...@confluent.io> wrote:

> Then publish with the user ID as the key and all messages for the same key
> will be guaranteed to go to the same partition and therefore be in order
> for whichever consumer gets that partition.
>
>
> //h...@confluent.io
>  Original message From: Abhit Kalsotra <abhit...@gmail.com>
> Date: 10/9/16  12:39 AM  (GMT-08:00) To: users@kafka.apache.org Subject:
> Re: Regarding Kafka
> What about the order of message getting received ? If i don't mention the
> partition.
>
> Lets say if i have user ID :4456 and I have to do some analytics at the
> Kafka Consumer end and at my consumer end if its not getting consumed the
> way I sent, then my analytics will go haywire.
>
> Abhi
>
> On Sun, Oct 9, 2016 at 12:50 PM, Hans Jespersen <h...@confluent.io> wrote:
>
> > You don't even have to do that because the default partitioner will
> spread
> > the data you publish to the topic over the available partitions for you.
> > Just try it out to see. Publish multiple messages to the topic without
> > using keys, and without specifying a partition, and observe that they are
> > automatically distributed out over the available partitions.
> >
> >
> > //h...@confluent.io
> >  Original message From: Abhit Kalsotra <
> abhit...@gmail.com>
> > Date: 10/8/16  11:19 PM  (GMT-08:00) To: users@kafka.apache.org Subject:
> > Re: Regarding Kafka
> > Hans
> >
> > Thanks for the response, yeah you can say yeah I am treating topics like
> > partitions, because my
> >
> > current logic of producing to a respective topic goes something like this
> >
> > RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_
> > kafkaTopic[whichTopic],
> >
> partition,
> >
> > RdKafka::Producer::RK_MSG_COPY,
> > ptr,
> > size,
> >
> > ,
> > NULL);
> > where partitionKey is unique number or userID, so what I am doing
> currently
> > each partitionKey%10
> > so whats so ever is the remainder, I am dumping that to the respective
> > topic.
> >
> > But as per your suggestion, Let me create close to 40-50 partitions for a
> > single topic and when i am producing I do something like this
> >
> > RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic,
> >
> > partition%(50),
> &

Re: Regarding Kafka

2016-10-09 Thread Abhit Kalsotra
I did that but i am getting confusing results

e.g

I have created 4 Kafka Consumer threads for doing data analytic, these
threads just wait for Kafka messages to get consumed and
I have provided the key provided when I produce, it means that all the
messages will go to one single partition ref "
http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
"
"* On the consumer side, Kafka always gives a single partition’s data to
one consumer thread.*"

If you see my application logs, my 4 Kafka Consumer Application threads
which are calling consume() , Arn't all message of a particular ID should
be consumed by one Kafka Application thread ?

[2016-10-08 23:37:07.498]AxThreadId 23516 ->ID:4495 offset: 74 ][ID ID
date:2016-09-28 20:07:32.000 ]
[2016-10-08 23:37:07.498]AxThreadId 2208 ->ID:4496 offset: 80 ][ID ID
date: 2016-09-28 20:07:39.000 ]
[2016-10-08 23:37:07.498]AxThreadId 2208 ->ID:4495 offset: 77 ][ID
date: 2016-09-28 20:07:35.000 ]
[2016-10-08 23:37:07.498]AxThreadId 23516 ->ID:4495 offset: 76][ID
date: 2016-09-28 20:07:34.000 ]
[2016-10-08 23:37:07.498]AxThreadId 9540 ->ID:4495 offset: 75 ][ID
date: 2016-09-28 20:07:33.000 ]
[2016-10-08 23:37:07.499]AxThreadId 23516 ->ID:4495 offset: 78 ][ID
date: 2016-09-28 20:07:36.000 ]
[2016-10-08 23:37:07.499]AxThreadId 2208 ->ID:4495 offset: 79 ][ID
date: 2016-09-28 20:07:37.000 ]
[2016-10-08 23:37:07.499]AxThreadId 9540 ->ID:4495 offset: 80 ][ID
date: 2016-09-28 20:07:38.000 ]
[2016-10-08 23:37:07.500]AxThreadId 23516 ->ID:4495 offset: 81][ID
date: 2016-09-28 20:07:39.000 ]




On Sun, Oct 9, 2016 at 1:31 PM, Hans Jespersen <h...@confluent.io> wrote:

> Then publish with the user ID as the key and all messages for the same key
> will be guaranteed to go to the same partition and therefore be in order
> for whichever consumer gets that partition.
>
>
> //h...@confluent.io
>  Original message From: Abhit Kalsotra <abhit...@gmail.com>
> Date: 10/9/16  12:39 AM  (GMT-08:00) To: users@kafka.apache.org Subject:
> Re: Regarding Kafka
> What about the order of message getting received ? If i don't mention the
> partition.
>
> Lets say if i have user ID :4456 and I have to do some analytics at the
> Kafka Consumer end and at my consumer end if its not getting consumed the
> way I sent, then my analytics will go haywire.
>
> Abhi
>
> On Sun, Oct 9, 2016 at 12:50 PM, Hans Jespersen <h...@confluent.io> wrote:
>
> > You don't even have to do that because the default partitioner will
> spread
> > the data you publish to the topic over the available partitions for you.
> > Just try it out to see. Publish multiple messages to the topic without
> > using keys, and without specifying a partition, and observe that they are
> > automatically distributed out over the available partitions.
> >
> >
> > //h...@confluent.io
> >  Original message From: Abhit Kalsotra <
> abhit...@gmail.com>
> > Date: 10/8/16  11:19 PM  (GMT-08:00) To: users@kafka.apache.org Subject:
> > Re: Regarding Kafka
> > Hans
> >
> > Thanks for the response, yeah you can say yeah I am treating topics like
> > partitions, because my
> >
> > current logic of producing to a respective topic goes something like this
> >
> > RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_
> > kafkaTopic[whichTopic],
> >
> partition,
> >
> > RdKafka::Producer::RK_MSG_COPY,
> > ptr,
> > size,
> >
> > ,
> > NULL);
> > where partitionKey is unique number or userID, so what I am doing
> currently
> > each partitionKey%10
> > so whats so ever is the remainder, I am dumping that to the respective
> > topic.
> >
> > But as per your suggestion, Let me create close to 40-50 partitions for a
> > single topic and when i am producing I do something like this
> >
> > RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic,
> >
> > partition%(50),
> >
> > RdKafka::Producer::RK_MSG_COPY,
> > ptr,
> > size,
> >
> > ,
> > NULL);
> >
> > Abhi
> >
> > On Sun, Oct 9, 2016 at 10:13 AM, Hans Jespersen <h...@confluent.io>
> wrote:
> >
> > > Why do you have 10 topics?  It seems like you are treating topics like
> > > parti

Re: Regarding Kafka

2016-10-09 Thread Hans Jespersen
Then publish with the user ID as the key and all messages for the same key will 
be guaranteed to go to the same partition and therefore be in order for 
whichever consumer gets that partition.


//h...@confluent.io
 Original message From: Abhit Kalsotra <abhit...@gmail.com> 
Date: 10/9/16  12:39 AM  (GMT-08:00) To: users@kafka.apache.org Subject: Re: 
Regarding Kafka 
What about the order of message getting received ? If i don't mention the
partition.

Lets say if i have user ID :4456 and I have to do some analytics at the
Kafka Consumer end and at my consumer end if its not getting consumed the
way I sent, then my analytics will go haywire.

Abhi

On Sun, Oct 9, 2016 at 12:50 PM, Hans Jespersen <h...@confluent.io> wrote:

> You don't even have to do that because the default partitioner will spread
> the data you publish to the topic over the available partitions for you.
> Just try it out to see. Publish multiple messages to the topic without
> using keys, and without specifying a partition, and observe that they are
> automatically distributed out over the available partitions.
>
>
> //h...@confluent.io
>  Original message From: Abhit Kalsotra <abhit...@gmail.com>
> Date: 10/8/16  11:19 PM  (GMT-08:00) To: users@kafka.apache.org Subject:
> Re: Regarding Kafka
> Hans
>
> Thanks for the response, yeah you can say yeah I am treating topics like
> partitions, because my
>
> current logic of producing to a respective topic goes something like this
>
> RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_
> kafkaTopic[whichTopic],
> partition,
>
> RdKafka::Producer::RK_MSG_COPY,
> ptr,
> size,
>
> ,
> NULL);
> where partitionKey is unique number or userID, so what I am doing currently
> each partitionKey%10
> so whats so ever is the remainder, I am dumping that to the respective
> topic.
>
> But as per your suggestion, Let me create close to 40-50 partitions for a
> single topic and when i am producing I do something like this
>
> RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic,
>
> partition%(50),
>
> RdKafka::Producer::RK_MSG_COPY,
> ptr,
> size,
>
> ,
> NULL);
>
> Abhi
>
> On Sun, Oct 9, 2016 at 10:13 AM, Hans Jespersen <h...@confluent.io> wrote:
>
> > Why do you have 10 topics?  It seems like you are treating topics like
> > partitions and it's unclear why you don't just have 1 topic with 10, 20,
> or
> > even 30 partitions. Ordering is only guaranteed at a partition level.
> >
> > In general if you want to capacity plan for partitions you benchmark a
> > single partition and then divide your peak estimated throughput by the
> > results of the single partition results.
> >
> > If you expect the peak throughput to increase over time then double your
> > partition count to allow room to grow the number of consumers without
> > having to repartition.
> >
> > Sizing can be a bit more tricky if you are using keys but it doesn't
> sound
> > like you are if today you are publishing to topics the way you describe.
> >
> > -hans
> >
> > > On Oct 8, 2016, at 9:01 PM, Abhit Kalsotra <abhit...@gmail.com> wrote:
> > >
> > > Guys any views ?
> > >
> > > Abhi
> > >
> > >> On Sat, Oct 8, 2016 at 4:28 PM, Abhit Kalsotra <abhit...@gmail.com>
> > wrote:
> > >>
> > >> Hello
> > >>
> > >> I am using librdkafka c++ library for my application .
> > >>
> > >> *My Kafka Cluster Set up*
> > >> 2 Kafka Zookeper running on 2 different instances
> > >> 7 Kafka Brokers , 4 Running on 1 machine and 3 running on other
> machine
> > >> Total 10 Topics and partition count is 3 with replication factor of 3.
> > >>
> > >> Now in my case I need to be very specific for the *message order*
> when I
> > >> am consuming the messages. I know if all the messages gets produced to
> > the
> > >> same partition, it always gets consumed in the same order.
> > >>
> > >> I need expert opinions like what's the ideal partition count I should
> > >> consider without effecting performance.( I am looking for close to
> > 100,000
> > >> messages per seconds).
> > >> The topics are from 0 to 9 and when I am producing messages I do
> > something
> > >> like uniqueUserId % 10 , and then pointing to a respective topic like
> 0
> > ||
> > >> 1 || 2 etc..
> > >>
> > >> Abhi
> > >>
> > >>
> > >>
> > >>
> > >> --
> > >> If you can't succeed, call it version 1.0
> > >>
> > >
> > >
> > >
> > > --
> > > If you can't succeed, call it version 1.0
> >
>
>
>
> --
> If you can't succeed, call it version 1.0
>



-- 
If you can't succeed, call it version 1.0


Re: Regarding Kafka

2016-10-09 Thread Abhit Kalsotra
What about the order of message getting received ? If i don't mention the
partition.

Lets say if i have user ID :4456 and I have to do some analytics at the
Kafka Consumer end and at my consumer end if its not getting consumed the
way I sent, then my analytics will go haywire.

Abhi

On Sun, Oct 9, 2016 at 12:50 PM, Hans Jespersen <h...@confluent.io> wrote:

> You don't even have to do that because the default partitioner will spread
> the data you publish to the topic over the available partitions for you.
> Just try it out to see. Publish multiple messages to the topic without
> using keys, and without specifying a partition, and observe that they are
> automatically distributed out over the available partitions.
>
>
> //h...@confluent.io
>  Original message From: Abhit Kalsotra <abhit...@gmail.com>
> Date: 10/8/16  11:19 PM  (GMT-08:00) To: users@kafka.apache.org Subject:
> Re: Regarding Kafka
> Hans
>
> Thanks for the response, yeah you can say yeah I am treating topics like
> partitions, because my
>
> current logic of producing to a respective topic goes something like this
>
> RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_
> kafkaTopic[whichTopic],
> partition,
>
> RdKafka::Producer::RK_MSG_COPY,
> ptr,
> size,
>
> ,
> NULL);
> where partitionKey is unique number or userID, so what I am doing currently
> each partitionKey%10
> so whats so ever is the remainder, I am dumping that to the respective
> topic.
>
> But as per your suggestion, Let me create close to 40-50 partitions for a
> single topic and when i am producing I do something like this
>
> RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic,
>
> partition%(50),
>
> RdKafka::Producer::RK_MSG_COPY,
> ptr,
> size,
>
> ,
> NULL);
>
> Abhi
>
> On Sun, Oct 9, 2016 at 10:13 AM, Hans Jespersen <h...@confluent.io> wrote:
>
> > Why do you have 10 topics?  It seems like you are treating topics like
> > partitions and it's unclear why you don't just have 1 topic with 10, 20,
> or
> > even 30 partitions. Ordering is only guaranteed at a partition level.
> >
> > In general if you want to capacity plan for partitions you benchmark a
> > single partition and then divide your peak estimated throughput by the
> > results of the single partition results.
> >
> > If you expect the peak throughput to increase over time then double your
> > partition count to allow room to grow the number of consumers without
> > having to repartition.
> >
> > Sizing can be a bit more tricky if you are using keys but it doesn't
> sound
> > like you are if today you are publishing to topics the way you describe.
> >
> > -hans
> >
> > > On Oct 8, 2016, at 9:01 PM, Abhit Kalsotra <abhit...@gmail.com> wrote:
> > >
> > > Guys any views ?
> > >
> > > Abhi
> > >
> > >> On Sat, Oct 8, 2016 at 4:28 PM, Abhit Kalsotra <abhit...@gmail.com>
> > wrote:
> > >>
> > >> Hello
> > >>
> > >> I am using librdkafka c++ library for my application .
> > >>
> > >> *My Kafka Cluster Set up*
> > >> 2 Kafka Zookeper running on 2 different instances
> > >> 7 Kafka Brokers , 4 Running on 1 machine and 3 running on other
> machine
> > >> Total 10 Topics and partition count is 3 with replication factor of 3.
> > >>
> > >> Now in my case I need to be very specific for the *message order*
> when I
> > >> am consuming the messages. I know if all the messages gets produced to
> > the
> > >> same partition, it always gets consumed in the same order.
> > >>
> > >> I need expert opinions like what's the ideal partition count I should
> > >> consider without effecting performance.( I am looking for close to
> > 100,000
> > >> messages per seconds).
> > >> The topics are from 0 to 9 and when I am producing messages I do
> > something
> > >> like uniqueUserId % 10 , and then pointing to a respective topic like
> 0
> > ||
> > >> 1 || 2 etc..
> > >>
> > >> Abhi
> > >>
> > >>
> > >>
> > >>
> > >> --
> > >> If you can't succeed, call it version 1.0
> > >>
> > >
> > >
> > >
> > > --
> > > If you can't succeed, call it version 1.0
> >
>
>
>
> --
> If you can't succeed, call it version 1.0
>



-- 
If you can't succeed, call it version 1.0


Re: Regarding Kafka

2016-10-09 Thread Hans Jespersen
You don't even have to do that because the default partitioner will spread the 
data you publish to the topic over the available partitions for you. Just try 
it out to see. Publish multiple messages to the topic without using keys, and 
without specifying a partition, and observe that they are automatically 
distributed out over the available partitions.


//h...@confluent.io
 Original message From: Abhit Kalsotra <abhit...@gmail.com> 
Date: 10/8/16  11:19 PM  (GMT-08:00) To: users@kafka.apache.org Subject: Re: 
Regarding Kafka 
Hans

Thanks for the response, yeah you can say yeah I am treating topics like
partitions, because my

current logic of producing to a respective topic goes something like this

RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic[whichTopic],
    partition,

RdKafka::Producer::RK_MSG_COPY,
    ptr,
    size,

,
    NULL);
where partitionKey is unique number or userID, so what I am doing currently
each partitionKey%10
so whats so ever is the remainder, I am dumping that to the respective
topic.

But as per your suggestion, Let me create close to 40-50 partitions for a
single topic and when i am producing I do something like this

RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic,

partition%(50),

RdKafka::Producer::RK_MSG_COPY,
    ptr,
    size,

,
    NULL);

Abhi

On Sun, Oct 9, 2016 at 10:13 AM, Hans Jespersen <h...@confluent.io> wrote:

> Why do you have 10 topics?  It seems like you are treating topics like
> partitions and it's unclear why you don't just have 1 topic with 10, 20, or
> even 30 partitions. Ordering is only guaranteed at a partition level.
>
> In general if you want to capacity plan for partitions you benchmark a
> single partition and then divide your peak estimated throughput by the
> results of the single partition results.
>
> If you expect the peak throughput to increase over time then double your
> partition count to allow room to grow the number of consumers without
> having to repartition.
>
> Sizing can be a bit more tricky if you are using keys but it doesn't sound
> like you are if today you are publishing to topics the way you describe.
>
> -hans
>
> > On Oct 8, 2016, at 9:01 PM, Abhit Kalsotra <abhit...@gmail.com> wrote:
> >
> > Guys any views ?
> >
> > Abhi
> >
> >> On Sat, Oct 8, 2016 at 4:28 PM, Abhit Kalsotra <abhit...@gmail.com>
> wrote:
> >>
> >> Hello
> >>
> >> I am using librdkafka c++ library for my application .
> >>
> >> *My Kafka Cluster Set up*
> >> 2 Kafka Zookeper running on 2 different instances
> >> 7 Kafka Brokers , 4 Running on 1 machine and 3 running on other machine
> >> Total 10 Topics and partition count is 3 with replication factor of 3.
> >>
> >> Now in my case I need to be very specific for the *message order* when I
> >> am consuming the messages. I know if all the messages gets produced to
> the
> >> same partition, it always gets consumed in the same order.
> >>
> >> I need expert opinions like what's the ideal partition count I should
> >> consider without effecting performance.( I am looking for close to
> 100,000
> >> messages per seconds).
> >> The topics are from 0 to 9 and when I am producing messages I do
> something
> >> like uniqueUserId % 10 , and then pointing to a respective topic like 0
> ||
> >> 1 || 2 etc..
> >>
> >> Abhi
> >>
> >>
> >>
> >>
> >> --
> >> If you can't succeed, call it version 1.0
> >>
> >
> >
> >
> > --
> > If you can't succeed, call it version 1.0
>



-- 
If you can't succeed, call it version 1.0


Re: Regarding Kafka

2016-10-09 Thread Abhit Kalsotra
Hans

Thanks for the response, yeah you can say yeah I am treating topics like
partitions, because my

current logic of producing to a respective topic goes something like this

RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic[whichTopic],
partition,

RdKafka::Producer::RK_MSG_COPY,
ptr,
size,

,
NULL);
where partitionKey is unique number or userID, so what I am doing currently
each partitionKey%10
so whats so ever is the remainder, I am dumping that to the respective
topic.

But as per your suggestion, Let me create close to 40-50 partitions for a
single topic and when i am producing I do something like this

RdKafka::ErrorCode resp = m_kafkaProducer->produce(m_kafkaTopic,

partition%(50),

RdKafka::Producer::RK_MSG_COPY,
ptr,
size,

,
NULL);

Abhi

On Sun, Oct 9, 2016 at 10:13 AM, Hans Jespersen  wrote:

> Why do you have 10 topics?  It seems like you are treating topics like
> partitions and it's unclear why you don't just have 1 topic with 10, 20, or
> even 30 partitions. Ordering is only guaranteed at a partition level.
>
> In general if you want to capacity plan for partitions you benchmark a
> single partition and then divide your peak estimated throughput by the
> results of the single partition results.
>
> If you expect the peak throughput to increase over time then double your
> partition count to allow room to grow the number of consumers without
> having to repartition.
>
> Sizing can be a bit more tricky if you are using keys but it doesn't sound
> like you are if today you are publishing to topics the way you describe.
>
> -hans
>
> > On Oct 8, 2016, at 9:01 PM, Abhit Kalsotra  wrote:
> >
> > Guys any views ?
> >
> > Abhi
> >
> >> On Sat, Oct 8, 2016 at 4:28 PM, Abhit Kalsotra 
> wrote:
> >>
> >> Hello
> >>
> >> I am using librdkafka c++ library for my application .
> >>
> >> *My Kafka Cluster Set up*
> >> 2 Kafka Zookeper running on 2 different instances
> >> 7 Kafka Brokers , 4 Running on 1 machine and 3 running on other machine
> >> Total 10 Topics and partition count is 3 with replication factor of 3.
> >>
> >> Now in my case I need to be very specific for the *message order* when I
> >> am consuming the messages. I know if all the messages gets produced to
> the
> >> same partition, it always gets consumed in the same order.
> >>
> >> I need expert opinions like what's the ideal partition count I should
> >> consider without effecting performance.( I am looking for close to
> 100,000
> >> messages per seconds).
> >> The topics are from 0 to 9 and when I am producing messages I do
> something
> >> like uniqueUserId % 10 , and then pointing to a respective topic like 0
> ||
> >> 1 || 2 etc..
> >>
> >> Abhi
> >>
> >>
> >>
> >>
> >> --
> >> If you can't succeed, call it version 1.0
> >>
> >
> >
> >
> > --
> > If you can't succeed, call it version 1.0
>



-- 
If you can't succeed, call it version 1.0


Re: Regarding Kafka

2016-10-08 Thread Hans Jespersen
Why do you have 10 topics?  It seems like you are treating topics like 
partitions and it's unclear why you don't just have 1 topic with 10, 20, or 
even 30 partitions. Ordering is only guaranteed at a partition level.

In general if you want to capacity plan for partitions you benchmark a single 
partition and then divide your peak estimated throughput by the results of the 
single partition results.

If you expect the peak throughput to increase over time then double your 
partition count to allow room to grow the number of consumers without having to 
repartition.

Sizing can be a bit more tricky if you are using keys but it doesn't sound like 
you are if today you are publishing to topics the way you describe.

-hans

> On Oct 8, 2016, at 9:01 PM, Abhit Kalsotra  wrote:
> 
> Guys any views ?
> 
> Abhi
> 
>> On Sat, Oct 8, 2016 at 4:28 PM, Abhit Kalsotra  wrote:
>> 
>> Hello
>> 
>> I am using librdkafka c++ library for my application .
>> 
>> *My Kafka Cluster Set up*
>> 2 Kafka Zookeper running on 2 different instances
>> 7 Kafka Brokers , 4 Running on 1 machine and 3 running on other machine
>> Total 10 Topics and partition count is 3 with replication factor of 3.
>> 
>> Now in my case I need to be very specific for the *message order* when I
>> am consuming the messages. I know if all the messages gets produced to the
>> same partition, it always gets consumed in the same order.
>> 
>> I need expert opinions like what's the ideal partition count I should
>> consider without effecting performance.( I am looking for close to 100,000
>> messages per seconds).
>> The topics are from 0 to 9 and when I am producing messages I do something
>> like uniqueUserId % 10 , and then pointing to a respective topic like 0 ||
>> 1 || 2 etc..
>> 
>> Abhi
>> 
>> 
>> 
>> 
>> --
>> If you can't succeed, call it version 1.0
>> 
> 
> 
> 
> -- 
> If you can't succeed, call it version 1.0


Re: Regarding Kafka

2016-10-08 Thread Abhit Kalsotra
Guys any views ?

Abhi

On Sat, Oct 8, 2016 at 4:28 PM, Abhit Kalsotra  wrote:

> Hello
>
> I am using librdkafka c++ library for my application .
>
> *My Kafka Cluster Set up*
> 2 Kafka Zookeper running on 2 different instances
> 7 Kafka Brokers , 4 Running on 1 machine and 3 running on other machine
> Total 10 Topics and partition count is 3 with replication factor of 3.
>
> Now in my case I need to be very specific for the *message order* when I
> am consuming the messages. I know if all the messages gets produced to the
> same partition, it always gets consumed in the same order.
>
> I need expert opinions like what's the ideal partition count I should
> consider without effecting performance.( I am looking for close to 100,000
> messages per seconds).
> The topics are from 0 to 9 and when I am producing messages I do something
> like uniqueUserId % 10 , and then pointing to a respective topic like 0 ||
> 1 || 2 etc..
>
> Abhi
>
>
>
>
> --
> If you can't succeed, call it version 1.0
>



-- 
If you can't succeed, call it version 1.0


Regarding Kafka

2016-10-08 Thread Abhit Kalsotra
Hello

I am using librdkafka c++ library for my application .

*My Kafka Cluster Set up*
2 Kafka Zookeper running on 2 different instances
7 Kafka Brokers , 4 Running on 1 machine and 3 running on other machine
Total 10 Topics and partition count is 3 with replication factor of 3.

Now in my case I need to be very specific for the *message order* when I am
consuming the messages. I know if all the messages gets produced to the
same partition, it always gets consumed in the same order.

I need expert opinions like what's the ideal partition count I should
consider without effecting performance.( I am looking for close to 100,000
messages per seconds).
The topics are from 0 to 9 and when I am producing messages I do something
like uniqueUserId % 10 , and then pointing to a respective topic like 0 ||
1 || 2 etc..

Abhi




-- 
If you can't succeed, call it version 1.0


RE: Regarding kafka partition and replication

2016-07-19 Thread Tauzell, Dave
Having multiple brokers on the same node has a couple of problems for a 
production installation:

1. You'll have multiple brokers contending for disk and memory resources
2. You could have your partitions replicated to the same node which means if 
that node fails you would lose data.

I think you are better off having 3 nodes with 3 brokers.   You can keep with 9 
partitions in case you want to add physical nodes in the future and have a 
replication factor of 2 or 3.

-Dave

Dave Tauzell | Senior Software Engineer | Surescripts
O: 651.855.3042 | www.surescripts.com |   dave.tauz...@surescripts.com
Connect with us: Twitter I LinkedIn I Facebook I YouTube


-Original Message-
From: Amit K [mailto:amitk@gmail.com]
Sent: Monday, July 18, 2016 8:55 PM
To: users@kafka.apache.org
Subject: Regarding kafka partition and replication

Hi,

I have 3 nodes, each with 3 brokers, Kafka cluster along with 3 zookeeper 
cluster. So total 9 brokers spread across 3 different machines. I am adhered to 
Kafka 0.9.

In order to optimally use the infrastructure for 2 topics (as of now, is not 
expected to grow drastically in near future), I am thinking of having 9 
partitions with 3 (or 6?) replication factor. Will this help me having good 
distribution of partitions and replicas across brokers? System does not have 
hugh load (<50 requests/sec of less than 1 kb load each) as of now and is 
neither expected to get higher load than this.

If this replication and partition does not help, please suggest a better topic 
partition and replication strategy.

Also please guide me with any articles or documents about setting up multi node 
Kafka cluster in regard to partition, replication, general properties to be 
used (a kind of good practice etc.)

Thanks,
Amit
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Regarding kafka partition and replication

2016-07-18 Thread Amit K
Hi,

I have 3 nodes, each with 3 brokers, Kafka cluster along with 3 zookeeper
cluster. So total 9 brokers spread across 3 different machines. I am
adhered to Kafka 0.9.

In order to optimally use the infrastructure for 2 topics (as of now, is
not expected to grow drastically in near future), I am thinking of having 9
partitions with 3 (or 6?) replication factor. Will this help me having good
distribution of partitions and replicas across brokers? System does not
have hugh load (<50 requests/sec of less than 1 kb load each) as of now and
is neither expected to get higher load than this.

If this replication and partition does not help, please suggest a better
topic partition and replication strategy.

Also please guide me with any articles or documents about setting up multi
node Kafka cluster in regard to partition, replication, general properties
to be used (a kind of good practice etc.)

Thanks,
Amit


Re: Regarding Kafka Log compaction Features

2016-05-06 Thread Spico Florin
hi!
 please have a look at this article.  it help me touse the log compaction
feature mechanism

i hope thtat it helps.
regards,
florin

http://www.shayne.me/blog/2015/2015-06-25-everything-about-kafka-part-2/
On Thursday, May 5, 2016, Behera, Himansu (Contractor) <
himansu_beh...@cable.comcast.com> wrote:

> Hi Team,
>
>
>
> I am working on implementing the kafka log compaction feature in my
> project.
>
>
>
> Please find the server. Properties. I have made all the config changes
> needed/suggested in the kafka log compaction  forum.But was not to able to
> resolve  the issue.
>
>
>
> My use as follows:
>
>
>
> Step 1:.We send a keyed message(String,string) from one of the producer
>  to the topic.
>
> Step 2:Then we send around 10 million  keyed messages(with unique key) to
> the above said topic.
>
> Step 3:Then  we try to send update to  the key in step 1 with some other
> value other than  in step 1 after 1800 secs
>
>
>
> Expected Result:  The key should be updated with  the recent value.
>
> Actual Result: The updated key contains the old value.
>
>
>
>
>
> Appreciate if someone can help me in implementing the log compaction
>  features POC.
>
>
>
> Please find the server. properties attached for  your reference.
>
>
>
> Regards,
>
> Himansu
>
>
>


Need help regarding Kafka

2016-03-23 Thread Srikanth Chandika
Hi,

-- I am new to kafka and zookeeper. I have implemented my test environment
with one Zookeeper node and 3 Kafka nodes. Now I want to increase my 1
zookeeper to 3 nodes ensemble.

-- I am continuously producing messages to one of the topic with python
script while the msgs producing in progress I am trying to increase the
nodes of zookeeper to 3 nodes from 1 node zookeeper.

-- While increasing the standalone zookeeper to multi node I should restart
the standalone zookeeper to know the new nodes existence to the standalone
and take the leadership.

-- When I restart standalone zookeeper there is minimum down time of
zookeeper so the msgs which I am producing from python script are missing
at the time of zookeeper restart.

-- After the zookeeper 3 nodes is done I am trying to tell the kafka that
there are another 2 nodes exists so again I should restart to update the
new nodes in the kafka config file server.properties at that particular
time also the msgs are lost.

-- So is there any other procedure to update the Zookeeper and kafka
configurations without loosing the Messages.

-- I am trying to find the solution on Google but failed.

Please help

Regards,
Srikanth


Re: regarding Kafka Queing system

2016-02-26 Thread Alexis Midon
You can fetch messages by offset.
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchRequest


On Fri, Feb 26, 2016 at 7:23 AM rahul shukla 
wrote:

> Hello,
> I am working SNMP trap parsing project in my acadmic. i am using kafka
> message system in my project. Actully  i want to store trap object in
> kafka which is getting from snmp agent, and retrieve that object on
> another side for further processing.
> So, my query is , IS there any way to store a particular  Event in
> kafka and retrieve that event on second side for  further processing
> ?.
>
> Please assist me.
>
>
> Thanks & Regards
> Rahul Shukla
> +91-9826277980
>


regarding Kafka Queing system

2016-02-26 Thread rahul shukla
Hello,
I am working SNMP trap parsing project in my acadmic. i am using kafka
message system in my project. Actully  i want to store trap object in
kafka which is getting from snmp agent, and retrieve that object on
another side for further processing.
So, my query is , IS there any way to store a particular  Event in
kafka and retrieve that event on second side for  further processing
?.

Please assist me.


Thanks & Regards
Rahul Shukla
+91-9826277980


Re: Question regarding Kafka EventThread

2015-08-05 Thread Mayuresh Gharat
What do you mean by Broker is down? Has the process shutdown and exited or
is the broker not reachable?

Thanks,

Mayuresh

On Wed, Aug 5, 2015 at 12:12 PM, Chinmay Soman chinmay.cere...@gmail.com
wrote:

 Hey guys,

 We're using Kafka version 0.8.2.0 and using the Java producer
 (KafkaProducer) to write to a single Kafka broker with acks=1. There's a
 callback defined on every produce request which takes corrective action if
 the write fails.

 What we see is that, if the broker is down for an extended period of time,
 any write to the producer gets stuck, with this message:

 [2015-08-05 12:02:04,743] WARN Error in I/O with /
 (org.apache.kafka.common.network.Selector:276)
 java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 at java.lang.Thread.run(Thread.java:745)

 This goes on repeatedly and the callback is never invoked. Is there a way
 to get the Kafka EventThread to give up after a while ?


 --
 Thanks and regards

 Chinmay Soman




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Question regarding Kafka EventThread

2015-08-05 Thread Chinmay Soman
After digging in the 0.8.2 code, it seems like the callback is not getting
invoked since 'handleDisconnections' is not adding a disconnected
ClientResponse to the list of responses. I do see a 'Node 0 disconnected'
message. However, I don't see a 'Cancelled request due to node being
disconnected' message.

Is this expected ? Is any of this because I'm only using 1 Kafka broker
node ? Will the producer get stuck if the Kafka cluster is not reachable
for whatever reason ?

On Wed, Aug 5, 2015 at 12:12 PM, Chinmay Soman chinmay.cere...@gmail.com
wrote:

 Hey guys,

 We're using Kafka version 0.8.2.0 and using the Java producer
 (KafkaProducer) to write to a single Kafka broker with acks=1. There's a
 callback defined on every produce request which takes corrective action if
 the write fails.

 What we see is that, if the broker is down for an extended period of time,
 any write to the producer gets stuck, with this message:

 [2015-08-05 12:02:04,743] WARN Error in I/O with /
 (org.apache.kafka.common.network.Selector:276)
 java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 at java.lang.Thread.run(Thread.java:745)

 This goes on repeatedly and the callback is never invoked. Is there a way
 to get the Kafka EventThread to give up after a while ?


 --
 Thanks and regards

 Chinmay Soman




-- 
Thanks and regards

Chinmay Soman


Re: Question regarding Kafka EventThread

2015-08-05 Thread Mayuresh Gharat
Your producer might get stuck if the kafka Broker becomes unreachable since
there is no socketTimeout on new producer. We are adding that in KAFKA-2120.

Thanks,

Mayuresh

On Wed, Aug 5, 2015 at 3:47 PM, Chinmay Soman chinmay.cere...@gmail.com
wrote:

 After digging in the 0.8.2 code, it seems like the callback is not getting
 invoked since 'handleDisconnections' is not adding a disconnected
 ClientResponse to the list of responses. I do see a 'Node 0 disconnected'
 message. However, I don't see a 'Cancelled request due to node being
 disconnected' message.

 Is this expected ? Is any of this because I'm only using 1 Kafka broker
 node ? Will the producer get stuck if the Kafka cluster is not reachable
 for whatever reason ?

 On Wed, Aug 5, 2015 at 12:12 PM, Chinmay Soman chinmay.cere...@gmail.com
 wrote:

  Hey guys,
 
  We're using Kafka version 0.8.2.0 and using the Java producer
  (KafkaProducer) to write to a single Kafka broker with acks=1. There's a
  callback defined on every produce request which takes corrective action
 if
  the write fails.
 
  What we see is that, if the broker is down for an extended period of
 time,
  any write to the producer gets stuck, with this message:
 
  [2015-08-05 12:02:04,743] WARN Error in I/O with /
  (org.apache.kafka.common.network.Selector:276)
  java.net.ConnectException: Connection refused
  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
  at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
  at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
  at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
  at java.lang.Thread.run(Thread.java:745)
 
  This goes on repeatedly and the callback is never invoked. Is there a way
  to get the Kafka EventThread to give up after a while ?
 
 
  --
  Thanks and regards
 
  Chinmay Soman
 



 --
 Thanks and regards

 Chinmay Soman




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Questions regarding Kafka-1477

2015-07-02 Thread Jan Filipiak

Hi,

just out of curiosity and because of Eugene's email, I browsed 
Kafka-1477 and it talks about SSL alot. So I thought I might throw in 
this http://tools.ietf.org/html/rfc7568 RFC. It basically says move away 
from SSL now and only do TLS. The title of the ticket still mentions TLS 
but afterwards its only SSL, haven't looked at any patches or library 
code so I can't really judge what's going on.


Further I found people starting to talk about sendfile(2) TLS support, 
here for example https://people.freebsd.org/~rrs/asiabsd_2015_tls.pd 
https://people.freebsd.org/%7Errs/asiabsd_2015_tls.pdf. So maybe we 
can keep this door open that at some point the Kernel will be able to do 
TLS for us?






On 02.07.2015 22:24, eugene miretsky wrote:

HI,

There is some work being done on security in Kafka:
Confluence: https://cwiki.apache.org/confluence/display/KAFKA/Security
Jira: https://issues.apache.org/jira/browse/KAFKA-1682

It seems like the main blockers are KAFKA-1477
https://issues.apache.org/jira/browse/KAFKA-1477, KAFKA-1691
https://issues.apache.org/jira/browse/KAFKA-1691  and KAFKA-1690
https://issues.apache.org/jira/browse/KAFKA-1690.

Is there an anticipated road map for when all the security features will be
done and merged in to trunk?


(


Re: Regarding Kafka release 0.8.2-beta

2015-01-26 Thread Jason Rosenberg
shouldn't the new consumer api be removed from the 0.8.2 code base then?

On Fri, Jan 23, 2015 at 10:30 AM, Joe Stein joe.st...@stealth.ly wrote:

 The new consumer is scheduled for 0.9.0.

 Currently Kafka release candidate 2 for 0.8.2.0 is being voted on.

 There is an in progress patch to the new consumer that you can try out
 https://issues.apache.org/jira/browse/KAFKA-1760

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Fri, Jan 23, 2015 at 1:55 AM, Reeni Mathew reenimathew...@gmail.com
 wrote:

  Hi Team,
 
  I was playing around with your recent release 0.8.2-beta.
  Producer worked fine whereas new consumer did not.
 
  org.apache.kafka.clients.consumer.KafkaConsumer
 
  After digging the code I realized that the implementation for the same is
  not available. Only API is present.
  Could you please let me know by when we can expect the implementation of
  the same.
 
  Thanks  Regards
 
  Reeni
 



Re: Regarding Kafka release 0.8.2-beta

2015-01-26 Thread Jun Rao
The new consumer api is actually excluded from the javadoc that we generate.

Thanks,

Jun

On Mon, Jan 26, 2015 at 11:54 AM, Jason Rosenberg j...@squareup.com wrote:

 shouldn't the new consumer api be removed from the 0.8.2 code base then?

 On Fri, Jan 23, 2015 at 10:30 AM, Joe Stein joe.st...@stealth.ly wrote:

  The new consumer is scheduled for 0.9.0.
 
  Currently Kafka release candidate 2 for 0.8.2.0 is being voted on.
 
  There is an in progress patch to the new consumer that you can try out
  https://issues.apache.org/jira/browse/KAFKA-1760
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Fri, Jan 23, 2015 at 1:55 AM, Reeni Mathew reenimathew...@gmail.com
  wrote:
 
   Hi Team,
  
   I was playing around with your recent release 0.8.2-beta.
   Producer worked fine whereas new consumer did not.
  
   org.apache.kafka.clients.consumer.KafkaConsumer
  
   After digging the code I realized that the implementation for the same
 is
   not available. Only API is present.
   Could you please let me know by when we can expect the implementation
 of
   the same.
  
   Thanks  Regards
  
   Reeni
  
 



Re: Regarding Kafka release 0.8.2-beta

2015-01-26 Thread Joe Stein
Matve wr should add to the documentation experimental so folks that don't
know understand.

/***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
/
On Jan 26, 2015 11:56 AM, Jason Rosenberg j...@squareup.com wrote:

 shouldn't the new consumer api be removed from the 0.8.2 code base then?

 On Fri, Jan 23, 2015 at 10:30 AM, Joe Stein joe.st...@stealth.ly wrote:

  The new consumer is scheduled for 0.9.0.
 
  Currently Kafka release candidate 2 for 0.8.2.0 is being voted on.
 
  There is an in progress patch to the new consumer that you can try out
  https://issues.apache.org/jira/browse/KAFKA-1760
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Fri, Jan 23, 2015 at 1:55 AM, Reeni Mathew reenimathew...@gmail.com
  wrote:
 
   Hi Team,
  
   I was playing around with your recent release 0.8.2-beta.
   Producer worked fine whereas new consumer did not.
  
   org.apache.kafka.clients.consumer.KafkaConsumer
  
   After digging the code I realized that the implementation for the same
 is
   not available. Only API is present.
   Could you please let me know by when we can expect the implementation
 of
   the same.
  
   Thanks  Regards
  
   Reeni
  
 



Regarding Kafka release 0.8.2-beta

2015-01-23 Thread Reeni Mathew
Hi Team,

I was playing around with your recent release 0.8.2-beta.
Producer worked fine whereas new consumer did not.

org.apache.kafka.clients.consumer.KafkaConsumer

After digging the code I realized that the implementation for the same is
not available. Only API is present.
Could you please let me know by when we can expect the implementation of
the same.

Thanks  Regards

Reeni


Query regarding Kafka publishing

2015-01-16 Thread Liju John
Hi ,

I have a general query -

As per the code in Kafka producer  the serialization happens before
partitioning , Is my understanding correct ? If yes whats the reason for it
?

Regards,
Liju John


Some doubts regarding kafka config parameters

2014-07-18 Thread shweta khare
hi,

I have the following doubts regarding some kafka config parameters:

For example if I have a Throughput topic with replication factor 1 and a
single partition 0,then i will see the following files under
/tmp/kafka-logs/Throughput_0:

.index
.log

70117826.index
70117826.log


1) *log.delete.delay.ms http://log.delete.delay.ms:*

The period of time we hold log files around after they are removed from the
 *index*. This period of time allows any in-progress reads to complete
 uninterrupted without locking. [6000]

 In the above description, does “*index*” refer to the in-memory
segment-list and not the 0.index file(in example above)?

As per documentation, kafka maintains an in-memory segment list:

 To enable read operations, kafka maintains an in-memory range(segment
 list) for each file. To avoid locking reads while still allowing deletes
 that modify the segment list we use a copy-on-write style segment list
 implementation that provides consistent views to allow a binary search to
 proceed on an immutable static snapshot view of the log segments while
 deletes are progressing.



2) *socket.request.max.bytes: *The maximum request size the server will
allow.

how is this different from message.max.bytes (The maximum size of a message
that the server can receive.)

3) *fetch.wait.max.ms http://fetch.wait.max.ms: *

 The maximum amount of time the *server *will block before answering the
 fetch request if there isn't sufficient data to immediately satisfy
 fetch.min.bytes


Does the server above refer to kafka consumer, which will block for
fetch.wait.max.ms? How is fetch.wait.max.ms different from *consumer.timeout.ms
http://consumer.timeout.ms* ?

4) Is there any correlation between a producer's
*queue.buffering.max.messages* and  *send.buffer.bytes? *

5) Will batching not happen in case producer.type=async and
request.required.acks=1 or -1 ? Since next message will only be sent after
an ack is received from leader/all ISR replicas?

6) *topic.metadata.refresh.interval.ms
http://topic.metadata.refresh.interval.ms: *
After every 10 mins I see the following on my producer side:

1200483 [main] INFO  kafka.client.ClientUtils$  - Fetching metadata from
broker id:0,host:localhost,port:9092 with correlation id 15078270 for 1
topic(s) Set(Throughput)

1200484 [main] INFO  kafka.producer.SyncProducer  - Connected to
localhost:9092 for producing

1200486 [main] INFO  kafka.producer.SyncProducer  - Disconnecting from
localhost:9092

1200486 [main] INFO  kafka.producer.SyncProducer  - Disconnecting from
sdp08:9092

1200487 [main] INFO  kafka.producer.SyncProducer  - Connected to sdp08:9092
for producing

Why is there a disconnection and re-connection happening on each metadata
refresh even though the leader is alive? I have noticed that I loose some
messages when this happens(with request.required.acks=0) ?

thank you,
shweta


Query regarding Kafka partitions and Consumer rebalancing

2014-07-15 Thread Madhavi Gokana (Vizury)
Hi,

Currently we trying to configure Kafka in our system for pulling messages
from Queues.

We have multiple consumers( we might want to add consumers if load on one
consumer increases) which need to receive and process messages from a Kafka
queue. Based on my understanding, under a single consumer group, one
partition can be read by only one consumer.

So if we want to make the setup such that no consumer gets over loaded in
any case, what would be the best way to do it.

If we have 6 partitions and 3 consumers which are equally efficient, then
load seems to be distributed equally. Suppose one of the consumers say
Consumer-3, for some reason processes the data 10 times slower, the we
would want to reduce the load on Consumer-3 and equally distribute load on
Consumer-1 and Counsumer-2. We wanted a pull based system which would help
us in reducing the load on a slow consumer.

Please let us know if there is any way to do this? Does kafka have any
alternate implementation in such cases?

Thanks,
Madhavi.


Re: Query regarding Kafka partitions and Consumer rebalancing

2014-07-15 Thread Guozhang Wang
Hi Madhavi,

Dynamically re-balance partitions based on processing efficiency and load
is a bit tricky to do in the current consumer since rebalances will only be
triggered by consumer membership change or topic/partition change. For your
case you would probably stop the slow consumer so that a rebalance will be
triggered to re-distribute partitions to the rest of the consumers.

Guozhang


On Tue, Jul 15, 2014 at 4:35 AM, Madhavi Gokana (Vizury) 
madhavi.gok...@vizury.com wrote:

 Hi,

 Currently we trying to configure Kafka in our system for pulling messages
 from Queues.

 We have multiple consumers( we might want to add consumers if load on one
 consumer increases) which need to receive and process messages from a Kafka
 queue. Based on my understanding, under a single consumer group, one
 partition can be read by only one consumer.

 So if we want to make the setup such that no consumer gets over loaded in
 any case, what would be the best way to do it.


If we have 6 partitions and 3 consumers which are equally efficient, then
 load seems to be distributed equally. Suppose one of the consumers say
 Consumer-3, for some reason processes the data 10 times slower, the we
 would want to reduce the load on Consumer-3 and equally distribute load on
 Consumer-1 and Counsumer-2. We wanted a pull based system which would help
 us in reducing the load on a slow consumer.

 Please let us know if there is any way to do this? Does kafka have any
 alternate implementation in such cases?

 Thanks,
 Madhavi.




-- 
-- Guozhang


Quick question regarding kafka broker security

2012-12-07 Thread Subhash Agrawal
Hi All,

I am new to Kafka broker and realized that Kafka broker does not enforce client 
authentication at connection or message level.
To avoid DOS attack, we are planning to implement security certificate at 
client connection level, not at message level,  so that
we can authenticate client connection before accepting messages.

Can you guys share your thoughts about this idea if it will be feasible without 
impacting system throughput?

Thanks
Subhash A.