Re: Why do I need to specify replication factor when creating a topic?
Jeff, I'm not sure if this is an option for you. However, I have been faced with a similar problem before and we handled it by putting all of the information needed to connect and use the Kafka API's in a config file. In our case we were using Typesafe config [1] for lots of configuration in our services. It has quite a few nice features and we were able to change the values based on the environment with Puppet and Chef, I'm not sure if you are using these or other tools in the DevOps space. Hope that helps. Ping me offline if you want to chat about it more. Thanks, Andrew On Thu, May 11, 2017 at 19:43 Jeff Widmanwrote: > To further clarify: > I'm trying to create topics programmatically. > > We want to run our code against dev/staging/production clusters. In dev, > they are often single-broker clusters. In production, we default to > replication factor of 3. > > So that's why it'd make life easier if it defaulted to the value in > server.properties, rather than our code having to figure out whether it's a > dev vs produciton cluster. > > I'm aware we could hack around this by relying on topic auto-creation, but > we'd rather disable that to prevent topics being accidentally created. > > On Thu, May 11, 2017 at 4:07 PM, Jeff Widman wrote: > > > When creating a new topic, why do I need to specify the replication > factor > > and number of partitions? > > > > I'd rather than when omitted, Kafka defaults to the value set in > > server.properties. > > > > Was this an explicit design decision? > > >
Re: Why do I need to specify replication factor when creating a topic?
If you enable auto topic creation that that is exactly what will happen. There are pros and cons to creating topics with defaults values but if you fell strongly that is the way that you want Kafka to work it is entire possible to setup the system to work that way. -hans > On May 11, 2017, at 4:07 PM, Jeff Widmanwrote: > > When creating a new topic, why do I need to specify the replication factor > and number of partitions? > > I'd rather than when omitted, Kafka defaults to the value set in > server.properties. > > Was this an explicit design decision?
Re: Why do I need to specify replication factor when creating a topic?
To further clarify: I'm trying to create topics programmatically. We want to run our code against dev/staging/production clusters. In dev, they are often single-broker clusters. In production, we default to replication factor of 3. So that's why it'd make life easier if it defaulted to the value in server.properties, rather than our code having to figure out whether it's a dev vs produciton cluster. I'm aware we could hack around this by relying on topic auto-creation, but we'd rather disable that to prevent topics being accidentally created. On Thu, May 11, 2017 at 4:07 PM, Jeff Widmanwrote: > When creating a new topic, why do I need to specify the replication factor > and number of partitions? > > I'd rather than when omitted, Kafka defaults to the value set in > server.properties. > > Was this an explicit design decision? >
Why do I need to specify replication factor when creating a topic?
When creating a new topic, why do I need to specify the replication factor and number of partitions? I'd rather than when omitted, Kafka defaults to the value set in server.properties. Was this an explicit design decision?
secure Kafka in Confluent
Hi I am trying to dabble with secure Kafka, and I have a small setup which I created by reading this: http://docs.confluent.io/current/kafka/ssl.html My question is - If I have a CA cert, that I add in Kafka broker's truststore and keystore, and also client's (Producer and Consumer) truststore and keystore, can I use the above setup without actually creating private certificate and signing with CA for Kafka server and Kafka client ? I am looking to reduce the overhead for Kafka broker and Kafka client to create their certificates, exporting it to CA, and then CA signing it. That is why I am wondering if Kafka broker and Kafka client only use certificate issued by CA, can they mutually authenticate and encrypt traffic ? Thanks. -- R
Re: Message polling behavior when subscribed to a topic pattern
Hello William, You have overriden KafkaConsumer's max.poll.records configuration, to return only up to 10 records in single call to poll, so it does that. Fetcher iterates over completed fetch responses and in first one(s) from single topic finds enough of records to satisfy max limit. See https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L514 Remove the limit override, and all 20 should be returned in single poll, given all get fetched within poll timeout. Kind regards, Stevo Slavic. On Thu, May 11, 2017 at 8:42 PM, William Yuwrote: > Hi, > > I'm trying to understand the behavior of consumer poll function when > subscribed to multiple topics using a RegEx topic pattern. I was under the > assumption the poll function would pull messages from each of the topics I > was subscribed to, but from a test program I wrote it looks like it will > pull from a given topic until it has been drained. > > Sample program: > https://gist.github.com/wiyu/afb860aacd0382f84601a3512a21e9d1 > > Test : > - 2 Topics: foo and bar > - published 10 msgs to each topic. > > Output: > records to process: 10 > [foo] partition = 2, offset = 83, value = 4 > [foo] partition = 2, offset = 84, value = 8 > [foo] partition = 3, offset = 82, value = 2 > [foo] partition = 3, offset = 83, value = 6 > [foo] partition = 3, offset = 84, value = 10 > [foo] partition = 0, offset = 111, value = 3 > [foo] partition = 0, offset = 112, value = 7 > [foo] partition = 1, offset = 104, value = 1 > [foo] partition = 1, offset = 105, value = 5 > [foo] partition = 1, offset = 106, value = 9 > committed: 10 > records to process: 10 > [bar] partition = 1, offset = 80, value = 2 > [bar] partition = 1, offset = 81, value = 4 > [bar] partition = 1, offset = 82, value = 6 > [bar] partition = 1, offset = 83, value = 8 > [bar] partition = 1, offset = 84, value = 10 > [bar] partition = 0, offset = 80, value = 1 > [bar] partition = 0, offset = 81, value = 3 > [bar] partition = 0, offset = 82, value = 5 > [bar] partition = 0, offset = 83, value = 7 > [bar] partition = 0, offset = 84, value = 9 > > Thanks, > William >
Message polling behavior when subscribed to a topic pattern
Hi, I'm trying to understand the behavior of consumer poll function when subscribed to multiple topics using a RegEx topic pattern. I was under the assumption the poll function would pull messages from each of the topics I was subscribed to, but from a test program I wrote it looks like it will pull from a given topic until it has been drained. Sample program: https://gist.github.com/wiyu/afb860aacd0382f84601a3512a21e9d1 Test : - 2 Topics: foo and bar - published 10 msgs to each topic. Output: records to process: 10 [foo] partition = 2, offset = 83, value = 4 [foo] partition = 2, offset = 84, value = 8 [foo] partition = 3, offset = 82, value = 2 [foo] partition = 3, offset = 83, value = 6 [foo] partition = 3, offset = 84, value = 10 [foo] partition = 0, offset = 111, value = 3 [foo] partition = 0, offset = 112, value = 7 [foo] partition = 1, offset = 104, value = 1 [foo] partition = 1, offset = 105, value = 5 [foo] partition = 1, offset = 106, value = 9 committed: 10 records to process: 10 [bar] partition = 1, offset = 80, value = 2 [bar] partition = 1, offset = 81, value = 4 [bar] partition = 1, offset = 82, value = 6 [bar] partition = 1, offset = 83, value = 8 [bar] partition = 1, offset = 84, value = 10 [bar] partition = 0, offset = 80, value = 1 [bar] partition = 0, offset = 81, value = 3 [bar] partition = 0, offset = 82, value = 5 [bar] partition = 0, offset = 83, value = 7 [bar] partition = 0, offset = 84, value = 9 Thanks, William
Request to Join Kafka user group
Request to Join Kafka user group
Missing consumer groups
Hi All, We're using kafka 0.10.0.0 and just encountered a weird issue I'd be happy to get some help with. Seems like we can't query active consumer groups using the kafka-consumer-groups.sh script. Even more, listing all the active consumer groups usually results in empty response (or a very partial one, 2 active consumer groups out of ~1000, usually new consumer groups appear). We're using the new consumer (since kafka 0.9), multiple topics (with single partition each, and a single consumer group attached to each). The __consumer_offsets topic lasts > 1 month, if it matter somehow. The command I'm running: $> ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer --describe --group my-consumer-group Consumer group `my-consumer-group` does not exist or is rebalancing. and $>./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer --list (doesn't return any output) Enabling debug (in the tools-log4j.properties, as I saw someone here suggested previously) didn't show any meaningful data. Any help will be gladly appreciate, Ofer.
答复: 0.10.1.0 version kafka replica syn slow
If you confirm it's the reason, then try to increase `num.replica.fetchers` to speed up the replication. 发件人: 蔡高年 <838199...@qq.com> 发送时间: 2017年5月11日 11:45 收件人: users 主题: 0.10.1.0 version kafka replica syn slow hello recently ,our prod environment have a proplem,the kafka leader Shrinking and Expanding IsR frequently,result in the consumer can not consume the message.do you have any advice? thank you. here is the log related. [serviceop@SZC-L0046001 kafka]$ ./bin/kafka-topics.sh --describe --zookeeper 30.16.36.181:2181,30.16.36.182:2181,30.16.36.183:2181/ubasKafka --topic kafkaUbasTopicProd Topic:kafkaUbasTopicProdPartitionCount:8ReplicationFactor:3 Configs: Topic: kafkaUbasTopicProd Partition: 0Leader: 1 Replicas: 1,7,0 Isr: 1,7,0 Topic: kafkaUbasTopicProd Partition: 1Leader: 2 Replicas: 2,0,1 Isr: 1,2,0 Topic: kafkaUbasTopicProd Partition: 2Leader: 3 Replicas: 3,1,2 Isr: 1,2,3 Topic: kafkaUbasTopicProd Partition: 3Leader: 4 Replicas: 4,2,3 Isr: 2,3,4 Topic: kafkaUbasTopicProd Partition: 4Leader: 5 Replicas: 5,3,4 Isr: 3,4,5 Topic: kafkaUbasTopicProd Partition: 5Leader: 6 Replicas: 6,4,5 Isr: 4,5,6 Topic: kafkaUbasTopicProd Partition: 6Leader: 7 Replicas: 7,5,6 Isr: 5,6,7 Topic: kafkaUbasTopicProd Partition: 7Leader: 0 Replicas: 0,6,7 Isr: 0 [2017-05-09 18:28:34,533] INFO Partition [kafkaUbasTopicProd,7] on broker 0: Expanding ISR for partition [kafkaUbasTopicProd,7] from 0 to 0,6 (kafka.cluster.Partition) [2017-05-09 18:28:53,901] INFO Partition [kafkaUbasTopicProd,7] on broker 0: Shrinking ISR for partition [kafkaUbasTopicProd,7] from 0,6 to 0 (kafka.cluster.Partition) [2017-05-09 18:33:03,901] INFO Partition [__consumer_offsets,2] on broker 0: Shrinking ISR for partition [__consumer_offsets,2] from 0,5,6 to 0,5 (kafka.cluster.Partition) [2017-05-09 18:33:03,903] INFO Partition [__consumer_offsets,10] on broker 0: Shrinking ISR for partition [__consumer_offsets,10] from 0,6,7 to 0,7 (kafka.cluster.Partition)
Implementing Sagas with Kafka
I am using Kafka for Event Sourcing and I am interested in implementing Sagas (or in general long-running distributed flows) using Kafka. I did some research but I could not find anything on the topic. There is plenty of information on Sagas but I feel an implementation using Kafka might involve several intricacies. Any best practices on how to do this? Thanks.
Re: Debugging Kafka Streams Windowing
Hi Matthias, We faced the issue again. The logs are below. 16:13:16.527 [StreamThread-7] INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for group grp_id 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group grp_id. 16:13:16.543 [StreamThread-3] INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for group grp_id 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group grp_id. 16:13:16.547 [StreamThread-6] INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for group grp_id 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group grp_id. 16:13:16.551 [StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for group grp_id 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group grp_id. 16:13:16.572 [StreamThread-4] INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator broker-05:6667 (id: 2147483642 rack: null) dead for group grp_id 16:13:16.573 [StreamThread-2] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator broker-05:6667 (id: 2147483642 rack: null) for group grp_id. On Tue, May 9, 2017 at 3:40 AM, Matthias J. Saxwrote: > Great! Glad 0.10.2.1 fixes it for you! > > -Matthias > > On 5/7/17 8:57 PM, Mahendra Kariya wrote: > > Upgrading to 0.10.2.1 seems to have fixed the issue. > > > > Until now, we were looking at random 1 hour data to analyse the issue. > Over > > the weekend, we have written a simple test that will continuously check > for > > inconsistencies in real time and report if there is any issue. > > > > No issues have been reported for the last 24 hours. Will update this > thread > > if we find any issue. > > > > Thanks for all the support! > > > > > > > > On Fri, May 5, 2017 at 3:55 AM, Matthias J. Sax > > wrote: > > > >> About > >> > >>> 07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator - > >>> Discovered coordinator broker-05:6667 for group group-2. > >> > >> Please upgrade to Streams 0.10.2.1 -- we fixed couple of bug and I would > >> assume this issue is fixed, too. If not, please report back. > >> > >>> Another question that I have is, is there a way for us detect how many > >>> messages have come out of order? And if possible, what is the delay? > >> > >> There is no metric or api for this. What you could do though is, to use > >> #transform() that only forwards each record and as a side task, extracts > >> the timestamp via `context#timestamp()` and does some book keeping to > >> compute if out-of-order and what the delay was. > >> > >> > > - same for .mapValues() > > > > I am not sure how to check this. > >> > >> The same way as you do for filter()? > >> > >> > >> -Matthias > >> > >> > >> On 5/4/17 10:29 AM, Mahendra Kariya wrote: > >>> Hi Matthias, > >>> > >>> Please find the answers below. > >>> > >>> I would recommend to double check the following: > > - can you confirm that the filter does not remove all data for those > time periods? > > >>> > >>> Filter does not remove all data. There is a lot of data coming in even > >>> after the filter stage. > >>> > >>> > - I would also check input for your AggregatorFunction() -- does it > receive everything? > > >>> > >>> Yes. Aggregate function seems to be receiving everything. > >>> > >>> > - same for .mapValues() > > >>> > >>> I am not sure how to check this. > >>> > >> > >> > > > >