Re: Mantain only last message by key
+1. Sounds like what you want is a compacted topic. Check out this section of the docs: https://kafka.apache.org/documentation#compaction Keep in mind that compaction isn't free. Like a garbage collector, it comes with some overhead and some knobs for tuning. On Wed, Oct 19, 2016 at 2:46 PM, Radoslaw Gruchalski wrote: > You can try cleanup.policy=compact. > But be careful with a large number of keys. > > – > Best regards, > Radek Gruchalski > ra...@gruchalski.com > > > On October 19, 2016 at 11:44:39 PM, Jesus Cabrera Reveles ( > jesus.cabr...@encontrack.com) wrote: > > > Hello, > We are a company of IoT and we are trying to implement kafka but we have > some questions. > > > We need a topic with a certain number of partitions, each message has its > own key, relative to id device. And we need the topic hold only the last > message, we don't need historic of messages with the same key. > It is possible configure the topic with this characteristic? > > > Thanks > > > -- > > > Jesús Cabrera Reveles > Programador Web > jesus.cabr...@encontrack.com > www.encontrack.com > Querétaro > > > > > > > Aviso de Confidencialidad: > > > El presente mensaje y material adjunto es confidencial, la información aquí > contenida está legalmente protegida y su divulgación indebida será > sancionada. Esta dirigido únicamente al destinatario. Si usted no es el > destinatario, no deberá copiarlo, revelarlo o distribuirlo. Cualquier > acción realizada en este sentido, será ilegal. Si por error recibe el > presente mensaje, por favor notifique al remitente. >
RE: Mirror multi-embedded consumer's configuration
Hi, Anybody could help to answer below question? Thanks! Best Regards Johnny From: ZHU Hua B Sent: 2016年10月19日 16:22 To: 'users@kafka.apache.org' Subject: Mirror multi-embedded consumer's configuration Hi, I launch Kafka mirror maker with multi-embedded consumer's configuration but failed as below, what’s the mean of “you asked for only one”, is there an option control it? Thanks! # bin/kafka-mirror-maker.sh --consumer.config config/consumer-1.properties --consumer.config config/consumer-2.properties --num.streams 2 --producer.config config/producer.properties --whitelist '.*' [2016-10-19 16:00:14,183] ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) joptsimple.MultipleArgumentsForOptionException: Found multiple arguments for option consumer.config, but you asked for only one at joptsimple.OptionSet.valueOf(OptionSet.java:179) at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:235) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Exception in thread "main" java.lang.NullPointerException at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:286) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Best Regards Johnny
Re: Offset sporadically being reset
Additionally, on consumer I observe a strange behavior: it is being constantly rebalancing. There are no errors and each rebalance succeeds, but as soon as one is finished the next one is started. On Wed, Oct 19, 2016 at 4:36 PM, Timur Fayruzov wrote: > Hello, > > I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe > strange behavior on select topics. The cluster runs in-house as well as > most consumers. I have started some consumers in AWS and they _mostly_ work > fine. Occasionally, I end up in a state where when I run > kafka-consumer-offset-checker I see that offset of one partition goes back > and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.) > > Kafka broker that is holding this partition has following log messages: > {"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka" > ,"thread":"kafka-request-handler-2","logger":"kafka. > server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica > Manager on Broker 0]: Error when processing fetch request for partition > [my_topic,1] offset 337055698 from consumer with correlation id 0. Possible > cause: Request for offset 337055698 but we only have log segments in the > range 347392118 to 361407455.","@version":"1","@severity":"ERROR"} > > {"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":" > java.io.IOException: Broken pipe > at > sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434) > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566) > at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) > at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) > at kafka.network.MultiSend.writeTo(Transmission.scala:101) > at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) > at kafka.network.MultiSend.writeTo(Transmission.scala:101) > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) > at kafka.network.Processor.write(SocketServer.scala:472) > at kafka.network.Processor.run(SocketServer.scala:342) > at java.lang.Thread.run(Thread.java:745)\n","thread":"kafka- > network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0. > util.pages","@category":"common","@msg":"Closing socket for /10.10.10.10 > because of error","@version":"1","@severity":"ERROR"} > > IP above is obscured, but it is an IP of the EC2 node that runs the > consumer for that partition. > > I try to reset offset for the consumer group at that partition manually (I > wrote a script for that), but I still see it being reset to a prior point > (and back). It seems that after a while this behavior goes away and > affected partitions have a chance to catch up, but then the whole thing > repeats. > > My consumer configuration is: > > "socket.timeout.ms": "6", > "zookeeper.session.timeout.ms": "6", > "offsets.channel.socket.timeout.ms": "3" > "auto.offset.reset": "smallest" > "offsets.storage": "kafka" > "consumer.timeout.ms": "1500" > > I use reactive-kafka wrapper, other places where it is used do not have > these problems. > > Please, advice what could this be. > > Thanks, > Timur >
Error while executing consumer group command null and Exiting due to: empty.head error while displaying offsets
Hi , I am using Kafka 0.9 broker and 0.8 Consumer. Consumer was running fine since long time but today, I am getting the below exception. [ConsumerFetcherThread--5], Error for partition [rtdp.bizlogging.iditextlogdata,93] to broker 5:class kafka.common.NotLeaderForPartitionException If I try to print out the offset using consumer-groups or the offset-checker script I get below exceptions. 1) ./bin/kafka-consumer-groups.sh --zookeeper xx.xx.xx.xxx:2181 --describe --group hdfs.iditextlogdata.horton.slca GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG hdfs.iditextlogdata.horton.slca, rtdp.bizlogging.iditextlogdata, 0, 118872516, 118872523, 7 hdfs.iditextlogdata.horton.slca, rtdp.bizlogging.iditextlogdata, 1, 118866187, 118866209, 22 hdfs.iditextlogdata.horton.slca, rtdp.bizlogging.iditextlogdata, 2, 118883367, 118883389, 22 Error while executing consumer group command null java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:193) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$getLogEndOffset$1.apply(ConsumerGroupCommand.scala:190) at scala.Option.map(Option.scala:146) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.getLogEndOffset(ConsumerGroupCommand.scala:190) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.kafka$admin$ConsumerGroupCommand$ConsumerGroupService$$describePartition(ConsumerGroupCommand.scala:121) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:102) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$describeTopicPartition$2.apply(ConsumerGroupCommand.scala:101) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeTopicPartition(ConsumerGroupCommand.scala:101) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeTopicPartition(ConsumerGroupCommand.scala:130) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.kafka$admin$ConsumerGroupCommand$ZkConsumerGroupService$$describeTopic(ConsumerGroupCommand.scala:177) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:162) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService$$anonfun$describeGroup$1.apply(ConsumerGroupCommand.scala:162) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:162) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:84) at kafka.admin.ConsumerGroupCommand$ZkConsumerGroupService.describe(ConsumerGroupCommand.scala:130) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) 2) ./bin/kafka-consumer-offset-checker.sh --zookeeper xx.xx.xx.xxx:2181 --group hdfs.iditextlogdata.horton.slca Group Topic Pid Offset logSize Lag hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 0 118873069 118873072 3 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 1 118866744 118866762 18 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 2 118883933 118883955 22 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 3 118879212 118879225 13 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 4 118874876 118874895 19 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 5 118887748 118887768 20 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 6 118897484 118897505 21 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 7 11124 11154 30 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogdata 8 118903070 118903106 36 hdfs.iditextlogdata.horton.slca rtdp.bizlogging.iditextlogda
Offset sporadically being reset
Hello, I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe strange behavior on select topics. The cluster runs in-house as well as most consumers. I have started some consumers in AWS and they _mostly_ work fine. Occasionally, I end up in a state where when I run kafka-consumer-offset-checker I see that offset of one partition goes back and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.) Kafka broker that is holding this partition has following log messages: {"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka","thread":"kafka-request-handler-2","logger":"kafka.server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica Manager on Broker 0]: Error when processing fetch request for partition [my_topic,1] offset 337055698 from consumer with correlation id 0. Possible cause: Request for offset 337055698 but we only have log segments in the range 347392118 to 361407455.","@version":"1","@severity":"ERROR"} {"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":"java.io.IOException: Broken pipe at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566) at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745)\n","thread":"kafka-network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0.util.pages","@category":"common","@msg":"Closing socket for /10.10.10.10 because of error","@version":"1","@severity":"ERROR"} IP above is obscured, but it is an IP of the EC2 node that runs the consumer for that partition. I try to reset offset for the consumer group at that partition manually (I wrote a script for that), but I still see it being reset to a prior point (and back). It seems that after a while this behavior goes away and affected partitions have a chance to catch up, but then the whole thing repeats. My consumer configuration is: "socket.timeout.ms": "6", "zookeeper.session.timeout.ms": "6", "offsets.channel.socket.timeout.ms": "3" "auto.offset.reset": "smallest" "offsets.storage": "kafka" "consumer.timeout.ms": "1500" I use reactive-kafka wrapper, other places where it is used do not have these problems. Please, advice what could this be. Thanks, Timur
Re: Frequent UNKNOWN_MEMBER_ID errors in kafka consumer
This usually means that the consumer is being kicked out of the group due to heartbeat failure detection protocol. Try to increase your session.timeout.ms config so that the heartbeating is less sensitive, and see if this issue happens less frequently. Guozhang On Wed, Oct 19, 2016 at 12:59 AM, dhiraj prajapati wrote: > Hi, > I have a consumer which implements new consumer api (0.9.0.1). I see below > errors quite frequently in the consumer application logs: > > ERROR [pool-4-thread-5] - o.a.k.c.c.i.ConsumerCoordinator - Error > UNKNOWN_MEMBER_ID occurred while committing offsets for group > audit.consumer.group > > Can you please enlighten me about the reason of its occurrence? > -- -- Guozhang
Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client
Thanks Michael Hopefully the upgrade story evolves as 0.10.1+ advances to maturity Just my 2 cents Decoupling the kafka streams from the core kafka changes will help so that the broker can be upgraded without notice and streaming apps can evolve to newer streaming features on their own pace Regards Sai On Wednesday, October 19, 2016, Michael Noll wrote: > Apps built with Kafka Streams 0.10.1 only work against Kafka clusters > running 0.10.1+. This explains your error message above. > > Unfortunately, Kafka's current upgrade story means you need to upgrade your > cluster in this situation. Moving forward, we're planning to improve the > upgrade/compatibility story of Kafka so that you could, for example, run a > newer version of Kafka Streams (or any other Kafka client) against an older > version of Kafka. > > > > On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra < > saiprasadmis...@gmail.com> wrote: > > > Hi All > > > > Was testing with 0.10.1.0 rc3 build for my new streams app > > > > Seeing issues starting my kafk streams app( 0.10.1.0) on the old version > > broker 0.10.0.1. I dont know if it is supposed to work as is. Will > upgrade > > the broker to same version and see whether it goes away > > > > client side issues > > > > == > > > > java.io.EOFException > > > > at > > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel( > > NetworkReceive.java:83) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.common.network.NetworkReceive. > > readFrom(NetworkReceive.java:71) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.common.network.KafkaChannel.receive( > > KafkaChannel.java:154) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at org.apache.kafka.common.network.KafkaChannel.read( > > KafkaChannel.java:135) > > ~[kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector. > > java:343) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at org.apache.kafka.common.network.Selector.poll(Selector.java:291) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > > ConsumerNetworkClient.java:232) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > > ConsumerNetworkClient.java:209) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > > awaitMetadataUpdate(ConsumerNetworkClient.java:148) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > > awaitMetadataUpdate(ConsumerNetworkClient.java:136) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > ensureCoordinatorReady(AbstractCoordinator.java:197) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > > ConsumerCoordinator.java:248) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > > pollOnce(KafkaConsumer.java:1013) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > KafkaConsumer.java:979) > > [kafka-clients-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:407) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > at > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:242) > > [kafka-streams-0.10.1.0.jar!/:?] > > > > > > > > On the broker side the following message appears > > > > = > > > > kafka.network.InvalidRequestException: Error getting request for > apiKey: 3 > > and apiVersion: 2 > > > > at > > kafka.network.RequestChannel$Request.liftedTree2$1( > > RequestChannel.scala:95) > > > > at kafka.network.RequestChannel$Request.(RequestChannel.scala:87) > > > > at > > kafka.network.Processor$$anonfun$processCompletedReceives$1. > > apply(SocketServer.scala:488) > > > > at > > kafka.network.Processor$$anonfun$processCompletedReceives$1. > > apply(SocketServer.scala:483) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > > > at kafka.network.Processor.processCompletedReceives( > > SocketServer.scala:483) > > > > at kafka.network.Processor.run(SocketServer.scala:413) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Caused by: java.lang.IllegalArgumentException: Invalid version for API > key > > 3: 2 > > > >
Re: Mantain only last message by key
You can try cleanup.policy=compact. But be careful with a large number of keys. – Best regards, Radek Gruchalski ra...@gruchalski.com On October 19, 2016 at 11:44:39 PM, Jesus Cabrera Reveles ( jesus.cabr...@encontrack.com) wrote: Hello, We are a company of IoT and we are trying to implement kafka but we have some questions. We need a topic with a certain number of partitions, each message has its own key, relative to id device. And we need the topic hold only the last message, we don't need historic of messages with the same key. It is possible configure the topic with this characteristic? Thanks -- Jesús Cabrera Reveles Programador Web jesus.cabr...@encontrack.com www.encontrack.com Querétaro Aviso de Confidencialidad: El presente mensaje y material adjunto es confidencial, la información aquí contenida está legalmente protegida y su divulgación indebida será sancionada. Esta dirigido únicamente al destinatario. Si usted no es el destinatario, no deberá copiarlo, revelarlo o distribuirlo. Cualquier acción realizada en este sentido, será ilegal. Si por error recibe el presente mensaje, por favor notifique al remitente.
Mantain only last message by key
Hello, We are a company of IoT and we are trying to implement kafka but we have some questions. We need a topic with a certain number of partitions, each message has its own key, relative to id device. And we need the topic hold only the last message, we don't need historic of messages with the same key. It is possible configure the topic with this characteristic? Thanks -- Jesús Cabrera Reveles Programador Web jesus.cabr...@encontrack.com www.encontrack.com Querétaro Aviso de Confidencialidad: El presente mensaje y material adjunto es confidencial, la información aquí contenida está legalmente protegida y su divulgación indebida será sancionada. Esta dirigido únicamente al destinatario. Si usted no es el destinatario, no deberá copiarlo, revelarlo o distribuirlo. Cualquier acción realizada en este sentido, será ilegal. Si por error recibe el presente mensaje, por favor notifique al remitente.
Re: Please help with AWS configuration
Sorry, had a typo in my gist. Here is the correct location: https://gist.github.com/anduill/710bb0619a80019016ac85bb5c060440 On 10/19/16, 4:33 PM, "David Garcia" wrote: Hello everyone. I’m having a hell of a time figuring out a Kafka performance issue in AWS. Any help is greatly appreciated! Here is our AWS configuration: - Zookeeper Cluster (3.4.6): 3-nodes on m4.xlarges (default configuration) EBS volumes (sd1) - Kafka Cluster (0.10.0): 3 nodes on m4.2xlarges (config: https://gist.github.com/anduill/710bb0619a80019016ac85bb5c060440 EBS volumes (sd1) Usage: Our usage of the cluster is fairly modest (at least I think so). At peak hours, each broker will receive about 1.4 MB/sec. Our primary input topic has about 54 partitions with replication set to 3 (ack=all). Another consumer consumes this topic and spreads the messages across 8 other topics each with 8 partitions…each of which has replication set to 2 (ack=all). Downstream, 4 other consumers consume these topics(one consumer consumes the 8 previous topics, transforms the messages, and sends the new messages to 8 other topics(ack=1) . In all we end up generating about 206 partitions with an average replication of 2.26. Our Problem: Our cluster will hum-along just fine when suddenly, 1 or more brokers will start experiencing severe ISR-shrinking/expanding. This causes underreplicated partitions and the producer purgatory size starts to rapidly expand(on the affected brokers)…this causes downstream producers to get behind in some cases. In the Kafka configuration above, we have a couple of non-default settings, but nothing seems to stand out. Is there anything obvious I’m missing (or need to add/adjust)? Or is there a bug I should be aware of that would cause these issues. -David
Please help with AWS configuration
Hello everyone. I’m having a hell of a time figuring out a Kafka performance issue in AWS. Any help is greatly appreciated! Here is our AWS configuration: - Zookeeper Cluster (3.4.6): 3-nodes on m4.xlarges (default configuration) EBS volumes (sd1) - Kafka Cluster (0.10.0): 3 nodes on m4.2xlarges (config: https://gist.github.com/anduill/710bb0619a80019016ac85bb5c060440) EBS volumes (sd1) Usage: Our usage of the cluster is fairly modest (at least I think so). At peak hours, each broker will receive about 1.4 MB/sec. Our primary input topic has about 54 partitions with replication set to 3 (ack=all). Another consumer consumes this topic and spreads the messages across 8 other topics each with 8 partitions…each of which has replication set to 2 (ack=all). Downstream, 4 other consumers consume these topics(one consumer consumes the 8 previous topics, transforms the messages, and sends the new messages to 8 other topics(ack=1) . In all we end up generating about 206 partitions with an average replication of 2.26. Our Problem: Our cluster will hum-along just fine when suddenly, 1 or more brokers will start experiencing severe ISR-shrinking/expanding. This causes underreplicated partitions and the producer purgatory size starts to rapidly expand(on the affected brokers)…this causes downstream producers to get behind in some cases. In the Kafka configuration above, we have a couple of non-default settings, but nothing seems to stand out. Is there anything obvious I’m missing (or need to add/adjust)? Or is there a bug I should be aware of that would cause these issues. -David
Re: How to block tests of Kafka Streams until messages processed?
Yeah, I did think to use that method, but as you said, it writes to a dummy output topic, which means I'd have to put in magic code just for the tests to pass (the actual code writes to cassandra and not to a dummy topic). On Thu, Oct 20, 2016 at 2:00 AM, Tauzell, Dave wrote: > For similar queue related tests we put the check in a loop. Check every > second until either the result is found or a timeout happens. > > -Dave > > -Original Message- > From: Ali Akhtar [mailto:ali.rac...@gmail.com] > Sent: Wednesday, October 19, 2016 3:38 PM > To: users@kafka.apache.org > Subject: How to block tests of Kafka Streams until messages processed? > > I'm using Kafka Streams, and I'm attempting to write integration tests for > a stream processor. > > The processor listens to a topic, processes incoming messages, and writes > some data to Cassandra tables. > > I'm attempting to write a test which produces some test data, and then > checks whether or not the expected data was written to Cassandra. > > It looks like this: > > - Step 1: Produce data in the test > - Step 2: Kafka stream gets triggered > - Step 3: Test checks whether cassandra got populated > > The problem is, Step 3 is occurring before Step 2, and as a result, the > test fails as it doesn't find the data in the table. > > I've resolved this by adding a Thread.sleep(2000) call after Step 1, which > ensures that Step 2 gets triggered before Step 3. > > However, I'm wondering if there's a more reliable way of blocking the test > until Kafka stream processor gets triggered? > > At the moment, I'm using 1 thread for the processor. If I increase that to > 2 threads, will that achieve what I want? > This e-mail and any files transmitted with it are confidential, may > contain sensitive information, and are intended solely for the use of the > individual or entity to whom they are addressed. If you have received this > e-mail in error, please notify the sender by reply e-mail immediately and > destroy all copies of the e-mail and any attachments. >
RE: How to block tests of Kafka Streams until messages processed?
For similar queue related tests we put the check in a loop. Check every second until either the result is found or a timeout happens. -Dave -Original Message- From: Ali Akhtar [mailto:ali.rac...@gmail.com] Sent: Wednesday, October 19, 2016 3:38 PM To: users@kafka.apache.org Subject: How to block tests of Kafka Streams until messages processed? I'm using Kafka Streams, and I'm attempting to write integration tests for a stream processor. The processor listens to a topic, processes incoming messages, and writes some data to Cassandra tables. I'm attempting to write a test which produces some test data, and then checks whether or not the expected data was written to Cassandra. It looks like this: - Step 1: Produce data in the test - Step 2: Kafka stream gets triggered - Step 3: Test checks whether cassandra got populated The problem is, Step 3 is occurring before Step 2, and as a result, the test fails as it doesn't find the data in the table. I've resolved this by adding a Thread.sleep(2000) call after Step 1, which ensures that Step 2 gets triggered before Step 3. However, I'm wondering if there's a more reliable way of blocking the test until Kafka stream processor gets triggered? At the moment, I'm using 1 thread for the processor. If I increase that to 2 threads, will that achieve what I want? This e-mail and any files transmitted with it are confidential, may contain sensitive information, and are intended solely for the use of the individual or entity to whom they are addressed. If you have received this e-mail in error, please notify the sender by reply e-mail immediately and destroy all copies of the e-mail and any attachments.
Re: How to block tests of Kafka Streams until messages processed?
Wanted to add that there is nothing too special about these utility functions, they are built using a normal consumer. Eno > On 19 Oct 2016, at 21:59, Eno Thereska wrote: > > Hi Ali, > > Any chance you could recycle some of the code we have in > streams/src/test/java/.../streams/integration/utils? (I know we don't have it > easily accessible in Maven, for now perhaps you could copy to your directory?) > > For example there is a method there > "IntegrationTestUtils.waitUntilMinValuesRecordsReceived" that could help. > E.g., it is used in almost all our integration tests. One caveat: this method > checks if values have been received in a topic, not Cassandra, so your > streams test might have to write to a dummy output topic, as well as to > Cassandra. > > Let me know what you think, > Eno > > >> On 19 Oct 2016, at 21:37, Ali Akhtar wrote: >> >> I'm using Kafka Streams, and I'm attempting to write integration tests for >> a stream processor. >> >> The processor listens to a topic, processes incoming messages, and writes >> some data to Cassandra tables. >> >> I'm attempting to write a test which produces some test data, and then >> checks whether or not the expected data was written to Cassandra. >> >> It looks like this: >> >> - Step 1: Produce data in the test >> - Step 2: Kafka stream gets triggered >> - Step 3: Test checks whether cassandra got populated >> >> The problem is, Step 3 is occurring before Step 2, and as a result, the >> test fails as it doesn't find the data in the table. >> >> I've resolved this by adding a Thread.sleep(2000) call after Step 1, which >> ensures that Step 2 gets triggered before Step 3. >> >> However, I'm wondering if there's a more reliable way of blocking the test >> until Kafka stream processor gets triggered? >> >> At the moment, I'm using 1 thread for the processor. If I increase that to >> 2 threads, will that achieve what I want? >
Re: How to block tests of Kafka Streams until messages processed?
Hi Ali, Any chance you could recycle some of the code we have in streams/src/test/java/.../streams/integration/utils? (I know we don't have it easily accessible in Maven, for now perhaps you could copy to your directory?) For example there is a method there "IntegrationTestUtils.waitUntilMinValuesRecordsReceived" that could help. E.g., it is used in almost all our integration tests. One caveat: this method checks if values have been received in a topic, not Cassandra, so your streams test might have to write to a dummy output topic, as well as to Cassandra. Let me know what you think, Eno > On 19 Oct 2016, at 21:37, Ali Akhtar wrote: > > I'm using Kafka Streams, and I'm attempting to write integration tests for > a stream processor. > > The processor listens to a topic, processes incoming messages, and writes > some data to Cassandra tables. > > I'm attempting to write a test which produces some test data, and then > checks whether or not the expected data was written to Cassandra. > > It looks like this: > > - Step 1: Produce data in the test > - Step 2: Kafka stream gets triggered > - Step 3: Test checks whether cassandra got populated > > The problem is, Step 3 is occurring before Step 2, and as a result, the > test fails as it doesn't find the data in the table. > > I've resolved this by adding a Thread.sleep(2000) call after Step 1, which > ensures that Step 2 gets triggered before Step 3. > > However, I'm wondering if there's a more reliable way of blocking the test > until Kafka stream processor gets triggered? > > At the moment, I'm using 1 thread for the processor. If I increase that to > 2 threads, will that achieve what I want?
Re: Embedded Kafka Cluster - Maven artifact?
Please change that. On Thu, Oct 20, 2016 at 1:53 AM, Eno Thereska wrote: > I'm afraid we haven't released this as a maven artefact yet :( > > Eno > > > On 18 Oct 2016, at 13:22, Ali Akhtar wrote: > > > > Is there a maven artifact that can be used to create instances > > of EmbeddedSingleNodeKafkaCluster for unit / integration tests? > >
Re: Embedded Kafka Cluster - Maven artifact?
I'm afraid we haven't released this as a maven artefact yet :( Eno > On 18 Oct 2016, at 13:22, Ali Akhtar wrote: > > Is there a maven artifact that can be used to create instances > of EmbeddedSingleNodeKafkaCluster for unit / integration tests?
How to block tests of Kafka Streams until messages processed?
I'm using Kafka Streams, and I'm attempting to write integration tests for a stream processor. The processor listens to a topic, processes incoming messages, and writes some data to Cassandra tables. I'm attempting to write a test which produces some test data, and then checks whether or not the expected data was written to Cassandra. It looks like this: - Step 1: Produce data in the test - Step 2: Kafka stream gets triggered - Step 3: Test checks whether cassandra got populated The problem is, Step 3 is occurring before Step 2, and as a result, the test fails as it doesn't find the data in the table. I've resolved this by adding a Thread.sleep(2000) call after Step 1, which ensures that Step 2 gets triggered before Step 3. However, I'm wondering if there's a more reliable way of blocking the test until Kafka stream processor gets triggered? At the moment, I'm using 1 thread for the processor. If I increase that to 2 threads, will that achieve what I want?
Re: Kafka Streams Aggregate By Date
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 About auto-topic creation: If your broker configuration allows for this, yes it would work. However, keep in mind, that the topic will be created with default values (according to broker config) with regard to number of partitions and replication factor. Those value might not meet your needs. Therefore, it is highly recommended to not rely on topic auto-create but to create all topics manually (to specify number of partitions and replication factor with values that do meet your needs ). And great that you got window-count working. - From you email, I am not sure if you are stuck again and if yes, what your question is? - -Matthias On 10/19/16 11:03 AM, Furkan KAMACI wrote: > I could successfully get the total (not average). As far as I see, > there is no need to create a topic manually before I run the app. > Topic is created if there is data and topic name not exists. Here > is my code: > > KStreamBuilder builder = new KStreamBuilder(); > > KStream longs = builder.stream(Serdes.String(), > Serdes.String(), "mytopic"); > > KTable, Long> longCounts = > longs.countByKey(TimeWindows.of("output-topic", 3600 * 1000), > Serdes.String()); > > KafkaStreams streams = new KafkaStreams(builder, > streamsConfiguration); streams.start(); > > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > > On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax > wrote: > > You should create input/intermediate and output topic manually > before you start you Kafka Streams application. > > > -Matthias > > On 10/18/16 3:34 PM, Furkan KAMACI wrote: Sorry about concurrent questions. Tried below code, didn't get any error but couldn't get created output topic: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 1000; i++) { producer.send(new ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); final KStreamBuilder builder = new KStreamBuilder(); final KStream qps = builder.stream(Serdes.String(), Serdes.Long(), "input-topic"); qps.countByKey(TimeWindows.of("Hourly", 3600 * 1000)).mapValues(Object::toString).to("output-topic"); final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax wrote: Two things: 1) you should not apply the window to the first count, but to the base stream to get correct results. 2) your windowed aggregation, doew not just return String type, but Window type. Thus, you need to either insert a .map() to transform you data into String typo, or you provide a custom serializer when writing data to output topic (method, .to(...) has multiple overloads) Per default, each topic read/write operation uses Serdes from the streams config. If you data has a different type, you need to provide appropriate Serdes for those operators. -Matthias On 10/18/16 2:01 PM, Furkan KAMACI wrote: >>> Hi Matthias, >>> >>> I've tried this code: >>> >>> *final Properties streamsConfiguration = new >>> Properties();* * >>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, >>> >>> > >>> "myapp");* * >>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> >>> > >>> "localhost:9092");* * >>> streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >>> >>> > >>> "localhost:2181");* * >>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> >>> > >>> Serdes.String().getClass().getName());* * >>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> >>> > >>> Serdes.String().getClass().getName());* *final >>> KStreamBuilder builder = new KStreamBuilder();* * final >>> KStream input = builder.stream("myapp-test");* >>> >>> *final KStream searchCounts = >>> input.countByKey("SearchRequests").toStream();* * >>> searchCounts.countByKey(TimeWindows.of("Hourly", 3600 >>> * 1000)).to("outputTopicHourlyCounts");* >>> >>> *final KafkaStreams streams = new >>> KafkaStreams(builder, streamsCon
ApacheCon is now less than a month away!
Dear Apache Enthusiast, ApacheCon Sevilla is now less than a month out, and we need your help getting the word out. Please tell your colleagues, your friends, and members of related technical communities, about this event. Rates go up November 3rd, so register today! ApacheCon, and Apache Big Data, are the official gatherings of the Apache Software Foundation, and one of the best places in the world to meet other members of your project community, gain deeper knowledge about your favorite Apache projects, learn about the ASF. Your project doesn't live in a vacuum - it's part of a larger family of projects that have a shared set of values, as well as a shared governance model. And many of our project have an overlap in developers, in communities, and in subject matter, making ApacheCon a great place for cross-pollination of ideas and of communities. Some highlights of these events will be: * Many of our board members and project chairs will be present * The lightning talks are a great place to hear, and give, short presentations about what you and other members of the community are working on * The key signing gets you linked into the web of trust, and better able to verify our software releases * Evening receptions and parties where you can meet community members in a less formal setting * The State of the Feather, where you can learn what the ASF has done in the last year, and what's coming next year * BarCampApache, an informal unconference-style event, is another venue for discussing your projects at the ASF We have a great schedule lined up, covering the wide range of ASF projects, including: * CI and CD at Scale: Scaling Jenkins with Docker and Apache Mesos - Carlos Sanchez * Inner sourcing 101 - Jim Jagielski * Java Memory Leaks in Modular Environments - Mark Thomas ApacheCon/Apache Big Data will be held in Sevilla, Spain, at the Melia Sevilla, November 14th through 18th. You can find out more at http://apachecon.com/ Other ways to stay up to date with ApacheCon are: * Follow us on Twitter at @apachecon * Join us on IRC, at #apachecon on the Freenode IRC network * Join the apachecon-discuss mailing list by sending email to apachecon-discuss-subscr...@apache.org * Or contact me directly at rbo...@apache.org with questions, comments, or to volunteer to help See you in Sevilla! -- Rich Bowen: VP, Conferences rbo...@apache.org http://apachecon.com/ @apachecon
Re: kafka streams metadata request fails on 0.10.0.1 broker/topic from 0.10.1.0 client
Apps built with Kafka Streams 0.10.1 only work against Kafka clusters running 0.10.1+. This explains your error message above. Unfortunately, Kafka's current upgrade story means you need to upgrade your cluster in this situation. Moving forward, we're planning to improve the upgrade/compatibility story of Kafka so that you could, for example, run a newer version of Kafka Streams (or any other Kafka client) against an older version of Kafka. On Tue, Oct 18, 2016 at 10:56 PM, saiprasad mishra < saiprasadmis...@gmail.com> wrote: > Hi All > > Was testing with 0.10.1.0 rc3 build for my new streams app > > Seeing issues starting my kafk streams app( 0.10.1.0) on the old version > broker 0.10.0.1. I dont know if it is supposed to work as is. Will upgrade > the broker to same version and see whether it goes away > > client side issues > > == > > java.io.EOFException > > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel( > NetworkReceive.java:83) > ~[kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.common.network.NetworkReceive. > readFrom(NetworkReceive.java:71) > ~[kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.common.network.KafkaChannel.receive( > KafkaChannel.java:154) > ~[kafka-clients-0.10.1.0.jar!/:?] > > at org.apache.kafka.common.network.KafkaChannel.read( > KafkaChannel.java:135) > ~[kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector. > java:343) > [kafka-clients-0.10.1.0.jar!/:?] > > at org.apache.kafka.common.network.Selector.poll(Selector.java:291) > [kafka-clients-0.10.1.0.jar!/:?] > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:232) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:209) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > awaitMetadataUpdate(ConsumerNetworkClient.java:148) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > awaitMetadataUpdate(ConsumerNetworkClient.java:136) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureCoordinatorReady(AbstractCoordinator.java:197) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:248) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1013) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:979) > [kafka-clients-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:407) > [kafka-streams-0.10.1.0.jar!/:?] > > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:242) > [kafka-streams-0.10.1.0.jar!/:?] > > > > On the broker side the following message appears > > = > > kafka.network.InvalidRequestException: Error getting request for apiKey: 3 > and apiVersion: 2 > > at > kafka.network.RequestChannel$Request.liftedTree2$1( > RequestChannel.scala:95) > > at kafka.network.RequestChannel$Request.(RequestChannel.scala:87) > > at > kafka.network.Processor$$anonfun$processCompletedReceives$1. > apply(SocketServer.scala:488) > > at > kafka.network.Processor$$anonfun$processCompletedReceives$1. > apply(SocketServer.scala:483) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at kafka.network.Processor.processCompletedReceives( > SocketServer.scala:483) > > at kafka.network.Processor.run(SocketServer.scala:413) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.IllegalArgumentException: Invalid version for API key > 3: 2 > > at org.apache.kafka.common.protocol.ProtoUtils.schemaFor( > ProtoUtils.java:31) > > at > org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java: > 44) > > at > org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java: > 60) > > at > org.apache.kafka.common.requests.MetadataRequest. > parse(MetadataRequest.java:96) > > at > org.apache.kafka.common.requests.AbstractRequest. > getRequest(AbstractRequest.java:48) > > at > kafka.network.RequestChannel$Request.liftedTree2$1( > RequestChannel.scala:92) > > Regards > > Sai >
Re: Kafka Streams Aggregate By Date
I could successfully get the total (not average). As far as I see, there is no need to create a topic manually before I run the app. Topic is created if there is data and topic name not exists. Here is my code: KStreamBuilder builder = new KStreamBuilder(); KStream longs = builder.stream(Serdes.String(), Serdes.String(), "mytopic"); KTable, Long> longCounts = longs.countByKey(TimeWindows.of("output-topic", 3600 * 1000), Serdes.String()); KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); On Wed, Oct 19, 2016 at 1:58 AM, Matthias J. Sax wrote: > -BEGIN PGP SIGNED MESSAGE- > Hash: SHA512 > > You should create input/intermediate and output topic manually before > you start you Kafka Streams application. > > > - -Matthias > > On 10/18/16 3:34 PM, Furkan KAMACI wrote: > > Sorry about concurrent questions. Tried below code, didn't get any > > error but couldn't get created output topic: > > > > Properties props = new Properties(); props.put("bootstrap.servers", > > "localhost:9092"); props.put("acks", "all"); props.put("retries", > > 0); props.put("batch.size", 16384); props.put("linger.ms", 1); > > props.put("buffer.memory", 33554432); props.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > props.put("value.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > > > Producer producer = new KafkaProducer<>(props); > > > > for (int i = 0; i < 1000; i++) { producer.send(new > > ProducerRecord<>( "input-topic", String.format("{\"type\":\"test\", > > \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i))); > > > > > > final KStreamBuilder builder = new KStreamBuilder(); final > > KStream qps = builder.stream(Serdes.String(), > > Serdes.Long(), "input-topic"); > > qps.countByKey(TimeWindows.of("Hourly", 3600 * > > 1000)).mapValues(Object::toString).to("output-topic"); > > > > final KafkaStreams streams = new KafkaStreams(builder, > > streamsConfiguration); streams.start(); > > > > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > > > > On Wed, Oct 19, 2016 at 12:14 AM, Matthias J. Sax > > wrote: > > > > Two things: > > > > 1) you should not apply the window to the first count, but to the > > base stream to get correct results. > > > > 2) your windowed aggregation, doew not just return String type, > > but Window type. Thus, you need to either insert a .map() to > > transform you data into String typo, or you provide a custom > > serializer when writing data to output topic (method, .to(...) has > > multiple overloads) > > > > Per default, each topic read/write operation uses Serdes from the > > streams config. If you data has a different type, you need to > > provide appropriate Serdes for those operators. > > > > > > -Matthias > > > > On 10/18/16 2:01 PM, Furkan KAMACI wrote: > Hi Matthias, > > I've tried this code: > > *final Properties streamsConfiguration = new > Properties();* * > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > > > "myapp");* * > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092");* * > streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > > "localhost:2181");* * > streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass().getName());* * > streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass().getName());* *final > KStreamBuilder builder = new KStreamBuilder();* * > final KStream input = builder.stream("myapp-test");* > > *final KStream searchCounts = > input.countByKey("SearchRequests").toStream();* * > searchCounts.countByKey(TimeWindows.of("Hourly", 3600 * > 1000)).to("outputTopicHourlyCounts");* > > *final KafkaStreams streams = new > KafkaStreams(builder, streamsConfiguration);* * > streams.start();* > > *Runtime.getRuntime().addShutdownHook(new > Thread(streams::close));* > > However I get an error: > > > *Exception in thread "StreamThread-1" > java.lang.ClassCastException: > org.apache.kafka.streams.kstream.Windowed cannot be cast to > java.lang.String* > > On the other hand when I try this code: > > https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760a > f3a > > > > I get an error too which indicates that: > > *Exception in thread "StreamThread-1" > org.apache.kafka.common.errors.SerializationException: Size > of data received by LongDeserializer is not 8 * > > Here is generated topic: > > *kafka-console-consumer --zookeeper loca
Re: [VOTE] 0.10.1.0 RC3
+1 from myself too. The vote passes with 9 +1 votes and no 0 or -1 votes. +1 votes PMC Members: * Gwen Shapira * Jun Rao * Neha Narkhede Committers: * Ismael Juma * Jason Gustafson Community: * Eno Thereska * Manikumar Reddy * Dana Powers * Magnus Edenhill 0 votes * No votes -1 votes * No votes I'll continue with the release process and the release announcement will follow shortly. Thanks, Jason On Wed, Oct 19, 2016 at 9:08 AM, Magnus Edenhill wrote: > +1 (non-binding) passes librdkafka test suites > > 2016-10-19 15:55 GMT+02:00 Ismael Juma : > > > +1 (non-binding). > > > > Verified source and Scala 2.11 binary artifacts, ran ./gradlew test with > > JDK 7u80, quick start on source artifact and Scala 2.11 binary artifacts. > > > > Thanks for managing the release! > > > > Ismael > > > > On Sat, Oct 15, 2016 at 12:29 AM, Jason Gustafson > > wrote: > > > > > Hello Kafka users, developers and client-developers, > > > > > > One more RC for 0.10.1.0. We're hoping this is the final one so that we > > can > > > meet the release target date of Oct. 17 (Monday). Please let me know as > > > soon as possible if you find any major problems. > > > > > > Release plan: https://cwiki.apache.org/confluence/display/KAFKA/Rele > > > ase+Plan+0.10.1. > > > > > > Release notes for the 0.10.1.0 release: > > > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/RELEASE_NOTES.html > > > > > > *** Please download, test and vote by Monday, Oct 17, 5pm PT > > > > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > > http://kafka.apache.org/KEYS > > > > > > * Release artifacts to be voted upon (source and binary): > > > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/ > > > > > > * Maven artifacts to be voted upon: > > > https://repository.apache.org/content/groups/staging/ > > > > > > * Javadoc: > > > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/javadoc/ > > > > > > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc3 tag: > > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h= > > > 50f30a44f31fca1bd9189d2814388d51bd56b06b > > > > > > * Documentation: > > > http://kafka.apache.org/0101/documentation.html > > > > > > * Protocol: > > > http://kafka.apache.org/0101/protocol.html > > > > > > * Tests: > > > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/71/ > > > System tests: > > > http://testing.confluent.io/confluent-kafka-0-10-1-system- > > > test-results/?prefix=2016-10-13--001.1476369986--apache--0. > > 10.1--ee212d1/ > > > > > > (Note that these tests do not include a couple patches merged today. I > > will > > > send links to updated test builds as soon as they are available) > > > > > > Thanks, > > > > > > Jason > > > > > >
Re: [VOTE] 0.10.1.0 RC3
+1 (non-binding) passes librdkafka test suites 2016-10-19 15:55 GMT+02:00 Ismael Juma : > +1 (non-binding). > > Verified source and Scala 2.11 binary artifacts, ran ./gradlew test with > JDK 7u80, quick start on source artifact and Scala 2.11 binary artifacts. > > Thanks for managing the release! > > Ismael > > On Sat, Oct 15, 2016 at 12:29 AM, Jason Gustafson > wrote: > > > Hello Kafka users, developers and client-developers, > > > > One more RC for 0.10.1.0. We're hoping this is the final one so that we > can > > meet the release target date of Oct. 17 (Monday). Please let me know as > > soon as possible if you find any major problems. > > > > Release plan: https://cwiki.apache.org/confluence/display/KAFKA/Rele > > ase+Plan+0.10.1. > > > > Release notes for the 0.10.1.0 release: > > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/RELEASE_NOTES.html > > > > *** Please download, test and vote by Monday, Oct 17, 5pm PT > > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > http://kafka.apache.org/KEYS > > > > * Release artifacts to be voted upon (source and binary): > > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/ > > > > * Maven artifacts to be voted upon: > > https://repository.apache.org/content/groups/staging/ > > > > * Javadoc: > > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/javadoc/ > > > > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc3 tag: > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h= > > 50f30a44f31fca1bd9189d2814388d51bd56b06b > > > > * Documentation: > > http://kafka.apache.org/0101/documentation.html > > > > * Protocol: > > http://kafka.apache.org/0101/protocol.html > > > > * Tests: > > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/71/ > > System tests: > > http://testing.confluent.io/confluent-kafka-0-10-1-system- > > test-results/?prefix=2016-10-13--001.1476369986--apache--0. > 10.1--ee212d1/ > > > > (Note that these tests do not include a couple patches merged today. I > will > > send links to updated test builds as soon as they are available) > > > > Thanks, > > > > Jason > > >
Re: Need to add & remove consumers dynamically in diffrent group and consume consecutively.
Do you have only one partition in the topic? The way Kafka works is that all messages are first distributed into partitions in the topic and then the consumers are distributed among them and they read them sequentially. If you have only one partition in the topic, all your messages will be in it but only one consumer can read messages per partition. If you have multiple partitions, you can have that many consumers reading from it in parallel. Let me know if that wasn't clear. _ From: Kaushil Rambhia/ MUM/CORP/ ENGINEERING Sent: Wednesday, October 19, 2016 8:40 PM Subject: Need to add & remove consumers dynamically in diffrent group and consume consecutively. To: Hi guys, i am using apache kafka with phprd kafka, i want to know how can i use multiple Kafka consumers on same partition from different groups to consume message parallel, say if consumer are c1,c2,c3 consuming single partition 0, than if c1 is consuming from 0 offset than c2 should start from 1 and c3 from 2 and if any new consumer comes up it should start from latest i.e 4th offset which is yet to be consumed by any consumer. So in short all consumers should consume consecutively from kafka partition and should not consumed same message again no matter any new consumer is added or removed from different group.It should automatically select consecutive offsets. It would be good if anyone can recommend any solution or can this be achieved by using kafka with some other tools like spark or something else. -- Regards, Kaushil Rambhia -- DISCLAIMER: The contents of this message may be legally privileged and confidential and are for the use of the intended recipient(s) only. It should not be read, copied and used by anyone other than the intended recipient(s). If you have received this message in error, please immediately notify the sender, preserve its confidentiality and delete it. Before opening any attachments please check them for viruses and defects.
Need to add & remove consumers dynamically in diffrent group and consume consecutively.
Hi guys, i am using apache kafka with phprd kafka, i want to know how can i use multiple Kafka consumers on same partition from different groups to consume message parallel, say if consumer are c1,c2,c3 consuming single partition 0, than if c1 is consuming from 0 offset than c2 should start from 1 and c3 from 2 and if any new consumer comes up it should start from latest i.e 4th offset which is yet to be consumed by any consumer. So in short all consumers should consume consecutively from kafka partition and should not consumed same message again no matter any new consumer is added or removed from different group.It should automatically select consecutive offsets. It would be good if anyone can recommend any solution or can this be achieved by using kafka with some other tools like spark or something else. -- Regards, Kaushil Rambhia -- DISCLAIMER: The contents of this message may be legally privileged and confidential and are for the use of the intended recipient(s) only. It should not be read, copied and used by anyone other than the intended recipient(s). If you have received this message in error, please immediately notify the sender, preserve its confidentiality and delete it. Before opening any attachments please check them for viruses and defects.
A more reliable way of consumer leaving group when it is closed?
Hi. I’ve run into a problem with Kafka consumers not leaving their consumer group cleanly when the application restarts. Out of 9 topics that (with a consumer for each) it seems that every time at least 2 or 3 do not leave the group cleanly so when the application starts these consumers start consuming only after session.timeout.ms expires and the group re-balances. When debugging this I’ve enabled logging from GroupCoordinator in the broker and I’ve really observed that not all of LeaveGroupRequests were received. I’ve looked at the code in KafkaConsumer.close() and the effort to send the LeaveGroupRequest really is minimal (and the comments suggest this is the intention). So if the network client cannot send the request immediately, it is not sent at all. But I think that the amount of effort put into sending LeaveGroupRequest should be the decision of the consumer’s user, not the library itself. IMHO it would be enough if the poll timeout when sending LeaveGroupRequest was configurable. We’re using Kafka 0.10.0.1. I’ve looked at the KafkaConsumer.close() code in the master branch and it looks like is has changed quite a bit when compared to 0.10.0.1 but the LeaveGroupRequest sending effort has not changed IMHO. Vlastimil
Re: [VOTE] 0.10.1.0 RC3
+1 (non-binding). Verified source and Scala 2.11 binary artifacts, ran ./gradlew test with JDK 7u80, quick start on source artifact and Scala 2.11 binary artifacts. Thanks for managing the release! Ismael On Sat, Oct 15, 2016 at 12:29 AM, Jason Gustafson wrote: > Hello Kafka users, developers and client-developers, > > One more RC for 0.10.1.0. We're hoping this is the final one so that we can > meet the release target date of Oct. 17 (Monday). Please let me know as > soon as possible if you find any major problems. > > Release plan: https://cwiki.apache.org/confluence/display/KAFKA/Rele > ase+Plan+0.10.1. > > Release notes for the 0.10.1.0 release: > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/RELEASE_NOTES.html > > *** Please download, test and vote by Monday, Oct 17, 5pm PT > > Kafka's KEYS file containing PGP keys we use to sign the release: > http://kafka.apache.org/KEYS > > * Release artifacts to be voted upon (source and binary): > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/ > > * Maven artifacts to be voted upon: > https://repository.apache.org/content/groups/staging/ > > * Javadoc: > http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/javadoc/ > > * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc3 tag: > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h= > 50f30a44f31fca1bd9189d2814388d51bd56b06b > > * Documentation: > http://kafka.apache.org/0101/documentation.html > > * Protocol: > http://kafka.apache.org/0101/protocol.html > > * Tests: > Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/71/ > System tests: > http://testing.confluent.io/confluent-kafka-0-10-1-system- > test-results/?prefix=2016-10-13--001.1476369986--apache--0.10.1--ee212d1/ > > (Note that these tests do not include a couple patches merged today. I will > send links to updated test builds as soon as they are available) > > Thanks, > > Jason >
ZK WARN Cannot open channel to 3 at election address .... java.net.ConnectException: Connection refused
Hi, I am running ZK from version 0.10.0.1 on 3 machines, the hostnames haven't been registered on DNS so all defined in /etc/hosts, there are no firewall between them. I saw many posts online pointed to some bugs in the zookeepers and there are many patches to fix it. Has the zookeeper in 0.10.0.1 had any issues? Thank you. Kind regards, - j
Getting issue with offset commit
Hi Initially we are using 8.2 Kafka client and server and things are working fines. Now Kafka server is upgraded to 10.0 and we getting an issue with offset commit. *Now Kafka setup looks like:* Kafka client : 8.2 version Kafka server: 10.2 version we are using manual offset commit and we are storing offset on Kafka. *Issue:* Last message read by consumer let say offset 10 and after processing it commit. After some interval of time (30-60 min) due to some reason re-balancing happened consumer will reading same offset again i.e. 10. There is no change at Kafka client side, same thing is working with old setup but after upgrading Kafka server we are facing this issue. So is there any setting at Kafka client side that i need to do which solve this problem. Thanks Kiran Singh
Mirror multi-embedded consumer's configuration
Hi, I launch Kafka mirror maker with multi-embedded consumer's configuration but failed as below, what's the mean of "you asked for only one", is there an option control it? Thanks! # bin/kafka-mirror-maker.sh --consumer.config config/consumer-1.properties --consumer.config config/consumer-2.properties --num.streams 2 --producer.config config/producer.properties --whitelist '.*' [2016-10-19 16:00:14,183] ERROR Exception when starting mirror maker. (kafka.tools.MirrorMaker$) joptsimple.MultipleArgumentsForOptionException: Found multiple arguments for option consumer.config, but you asked for only one at joptsimple.OptionSet.valueOf(OptionSet.java:179) at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:235) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Exception in thread "main" java.lang.NullPointerException at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:286) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Best Regards Johnny
Re: Kafka consumer cluster setup
Hi Ian, Thanks for the detailed explanation. Thanks and Regards A.SathishKumar > Hi > The client initially connects to the first Broker specified in > bootstrap.servers (if that’s > not available, it’ll connect to the next one on the list, and so on). When it > does so, it > is then given information about all the Brokers in the cluster, and it > connects to all of > them. Strictly speaking, then, you only need to specify one Broker in the > bootstrap.servers > list, but if that had crashed when the client tried to connect, the client > would throw an > error after the connection attempt timed out, which is why you’d typically > specify two or > three in that list. Certainly, though, that list isn’t a list of the only > Brokers that the > client will talk to as it’s running. Ian. --- Ian Wrigley Director, Education Services Confluent, Inc > On Oct 16, 2016, at 11:17 PM, sat wrote: > > Hi, > > We are planning to run Kafka servers along with Zookeeper in 3 remote > machines for our cluster setup. > > We have our application with Kafka Consumer and Publisher running in > another/separate 2 machines. > > We will be configuring 1 or 2 out of 3 Kafka servers as bootstrap servers > in Kafka Consumer/Publisher properties. > > Since we are configuring 1 or 2 Kafka server IP address in bootstrap server > list, we want to know when one or both the servers configured as bootstrap > server crashes, can we still consume/publish messages to the other server > available in the cluster. If this is possible, could you please let us > know whether our application opens a socket with all 3 machines in the > server. > > Also please let us know if it is fine to configure/provide only 1 machine > out of 3 as bootstrap server. > > Please let me know if in case you need additional information. Thanks in > advance. > > > > Thanks and Regards > A.SathishKumar On Sun, Oct 16, 2016 at 9:17 PM, sat wrote: > Hi, > > We are planning to run Kafka servers along with Zookeeper in 3 remote > machines for our cluster setup. > > We have our application with Kafka Consumer and Publisher running in > another/separate 2 machines. > > We will be configuring 1 or 2 out of 3 Kafka servers as bootstrap servers > in Kafka Consumer/Publisher properties. > > Since we are configuring 1 or 2 Kafka server IP address in bootstrap > server list, we want to know when one or both the servers configured as > bootstrap server crashes, can we still consume/publish messages to the > other server available in the cluster. If this is possible, could you > please let us know whether our application opens a socket with all 3 > machines in the server. > > Also please let us know if it is fine to configure/provide only 1 machine > out of 3 as bootstrap server. > > Please let me know if in case you need additional information. Thanks in > advance. > > > > Thanks and Regards > A.SathishKumar > > -- A.SathishKumar 044-24735023
Frequent UNKNOWN_MEMBER_ID errors in kafka consumer
Hi, I have a consumer which implements new consumer api (0.9.0.1). I see below errors quite frequently in the consumer application logs: ERROR [pool-4-thread-5] - o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group audit.consumer.group Can you please enlighten me about the reason of its occurrence?