Current offset for partition out of range; reset offset
Hi All, One of the partitions showing the huge lag(21K) and I see the below error in kafkaserver.out log of one of the kafka nodes. Current offset 43294 for partition [PROD_TASK_TOPIC_120,10] out of range; reset offset to 43293 (kafka.server.ReplicaFetcherThread) What is the quick solution, its happening in Production? Will that 21K messages again be processed which is wrong? Can we restart the kafka nodes or applications to resolve the issue? Thanks Achintya
log.retention attribute not working
Hi there, Any idea why log.retention attribute is not working? We kept log.retention.hours=6 in server.properties but we see old data are not getting deleted. We see Dec 9th data/log files are still there. We are running this in production boxes and if it does not delete the old files our storage will be full very soon. Please help on this. Here is the details of our configuration: # The minimum age of a log file to be eligible for deletion log.retention.hours=6 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=30 Thanks Achintya
RE: Kafka consumers are not equally distributed
No, that is not the reason. Initially all the partitions were assigned the messages and those were processed very fast and sit idle even other partitions are having a lot of messages to be processed. So I was under impression that rebalance should be triggered and messages will be re-distributed equally again. Thanks Achintya -Original Message- From: Sharninder [mailto:sharnin...@gmail.com] Sent: Wednesday, November 23, 2016 12:33 AM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka consumers are not equally distributed Could it be because of the partition key ? On Wed, Nov 23, 2016 at 12:33 AM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi there, > > We are doing the load test in Kafka with 25tps and first 9 hours it > went fine almost 80K/hr messages were processed after that we see a > lot of lags and we stopped the incoming load. > > Currently we see 15K/hr messages are processing. We have 40 consumer > instances with concurrency 4 and 2 topics and both is having 160 > partitions so each consumer with each partition. > > What we found that some of the partitions are sitting idle and some of > are overloaded and its really slowing down the consumer message processing. > > Why rebalancing is not happening and existing messages are not > distributed equally among the instances? We tried to restart the app > still the same pace. Any idea what could be the reason? > > Thanks > Achintya > > -- -- Sharninder
Kafka consumers are not equally distributed
Hi there, We are doing the load test in Kafka with 25tps and first 9 hours it went fine almost 80K/hr messages were processed after that we see a lot of lags and we stopped the incoming load. Currently we see 15K/hr messages are processing. We have 40 consumer instances with concurrency 4 and 2 topics and both is having 160 partitions so each consumer with each partition. What we found that some of the partitions are sitting idle and some of are overloaded and its really slowing down the consumer message processing. Why rebalancing is not happening and existing messages are not distributed equally among the instances? We tried to restart the app still the same pace. Any idea what could be the reason? Thanks Achintya
RE: Kafka 0.10 Monitoring tool
Thank you Otis for your reply. Kafka Manger does not work during the high load, it shows the timeout and Burrow and KafkaOffsetMonitor does not return the group names properly even during the load. SPM is not an open source, so do you have anything opensource that works in Kafka 0.10 version? Thanks Achintya -Original Message- From: Otis Gospodnetić [mailto:otis.gospodne...@gmail.com] Sent: Monday, November 14, 2016 9:25 PM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka 0.10 Monitoring tool Hi, Why are these tools not working perfectly for you? Does it *have to* be open-source? If not, Sematext SPM collects a lot of Kafka metrics, with consumer lag being one of them -- https://sematext.com/blog/2016/06/07/kafka-consumer-lag-offsets-monitoring/ Otis -- Monitoring - Log Management - Alerting - Anomaly Detection Solr & Elasticsearch Consulting Support Training - http://sematext.com/ On Mon, Nov 14, 2016 at 5:16 PM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi there, > What is the best open source tool for Kafka monitoring mainly to check > the offset lag. We tried the following tools: > > > 1. Burrow > > 2. KafkaOffsetMonitor > > 3. Prometheus and Grafana > > 4. Kafka Manager > > But nothing is working perfectly. Please help us on this. > > Thanks > Achintya > >
Kafka 0.10 Monitoring tool
Hi there, What is the best open source tool for Kafka monitoring mainly to check the offset lag. We tried the following tools: 1. Burrow 2. KafkaOffsetMonitor 3. Prometheus and Grafana 4. Kafka Manager But nothing is working perfectly. Please help us on this. Thanks Achintya
SendFailedException
Hi there, Can anyone please help us as we are getting the SendFailedException when Kafka consumer is starting and not able to consume any message? Thanks Achintya
Kafka duplicate offset at Consumer
Hi there, I see a lot of same offset value kafka consumer receives hence it creates a lot of duplicate messages. What could be the reason and how we can solve this issue? Thanks Achintya
RE: Kafka usecase
Please find my response here. 1. Kafka can be used as a message store. 2. What is the message arrival rate per second? 20 per sec 3. What is the SLA for the messages to be processed? 500 ms per message 4. If your messages arrive faster than they are consumed, you will get a backlog of messages. In that case, you may need to grow your cluster so that more messages are processed in parallel. You mean here to create more partitions or any thing else we need to do? -Original Message- From: Lohith Samaga M [mailto:lohith.sam...@mphasis.com] Sent: Monday, September 19, 2016 12:24 AM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: RE: Kafka usecase Hi Achintya, 1. Kafka can be used as a message store. 2. What is the message arrival rate per second? 3. What is the SLA for the messages to be processed? 4. If your messages arrive faster than they are consumed, you will get a backlog of messages. In that case, you may need to grow your cluster so that more messages are processed in parallel. Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga -Original Message- From: Ghosh, Achintya (Contractor) [mailto:achintya_gh...@comcast.com] Sent: Monday, September 19, 2016 08.39 To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Kafka usecase Hi there, We have an usecase where we do a lot of business logic to process each message and sometime it takes 1-2 sec, so will be Kafka fit in our usecase? Thanks Achintya Information transmitted by this e-mail is proprietary to Mphasis, its associated companies and/ or its customers and is intended for use only by the individual or entity to which it is addressed, and may contain information that is privileged, confidential or exempt from disclosure under applicable law. If you are not the intended recipient or it appears that this mail has been forwarded to you without proper authority, you are notified that any use or dissemination of this information in any manner is strictly prohibited. In such cases, please notify us immediately at mailmas...@mphasis.com and delete this mail from your records.
Kafka usecase
Hi there, We have an usecase where we do a lot of business logic to process each message and sometime it takes 1-2 sec, so will be Kafka fit in our usecase? Thanks Achintya
RE: Kafka consumers unable to process message
I'm trying get the consumer logs and will send you. So it means it can happen even my local datacenter too. Still I'm not understanding if 3 nodes are up and message already replicated why it's trying to fetch the data from failed node. Can you please explain bit details how it works. Thanks for your response. -Original Message- From: Jason Gustafson [mailto:ja...@confluent.io] Sent: Wednesday, August 31, 2016 10:56 PM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka consumers unable to process message The exceptions show one of the replica fetcher threads on the broker failing which makes perfect sense since some of the partitions were bound to have leaders in the failed datacenter. I'd actually like to see the consumer logs at DEBUG level if possible. Thanks, Jason On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi Jason, > > No, I didn't bring down any zookeeper server. Even I tried with 3 > zookeeper server one as an 'Observer' but the same issue. > > Here is the server log from one of the node of my other datacenter: > > [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0. > (kafka.log.Log) > [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting > (kafka.server.ReplicaFetcherThread) > [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] > Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker > BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server. > ReplicaFetcherManager) > [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in > fetch > kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to 3 was disconnected before the > response was read > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > NetworkClientBlockingOps.scala:87) > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply( > NetworkClientBlockingOps.scala:84) > at scala.Option.foreach(Option.scala:257) > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > scala:84) > at kafka.utils.NetworkClientBlockingOps$$anonfun$ > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps. > scala:80) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2( > NetworkClientBlockingOps.scala:137) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollContinuously$extension( > NetworkClientBlockingOps.scala:143) > at > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ > extension(NetworkClientBlockingOps.scala:80) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:244) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFetcherThread.doWork( > AbstractFetcherThread.scala:98) > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in > fetch > kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server. > ReplicaFetcherThread) > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: > 3 > rack: null) failed > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:63) > at > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$ > extension$2.apply(NetworkClientBlockingOps.scala:59) > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > NetworkClientBlockingOps.scala:112) > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps. > scala:120) > at > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension( > NetworkClientBlockingOps.scala:59) > at kafka.server.ReplicaFetcherThread.sendRequest( > ReplicaFetcherThread.scala:239) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:229) > at kafka.server.ReplicaFetcherThread.fetch( > ReplicaFetcherThread.scala:42) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:107) > at kafka.server.AbstractFet
RE: Kafka consumers unable to process message
in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) Why it's trying to connect the node3 of my local datacenter and it's throwing IOException. Thanks Achintya -Original Message- From: Jason Gustafson [mailto:ja...@confluent.io] Sent: Wednesday, August 31, 2016 10:26 PM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka consumers unable to process message Hi Achintya, Just to clarify, you did not take down either of the zookeepers in this test, right? Having only two zookeepers in the ensemble would mean that if either one of them failed, zookeeper wouldn't be able to reach quorum. I'm not entirely sure why this would happen. One possibility is that the consumer is failing to find the new coordinator, which might happen if all the replicas for one of the __consumer_offsets partitions were located in the "failed" datacenter. Perhaps you can enable DEBUG logging and post some logs so we can see what it's actually doing during poll(). By the way, I noticed that your consumer configuration settings seem a little mixed up. The new consumer doesn't actually communicate with Zookeeper, so there's no need for those settings. And you don't need to include the "offsets.storage" option since Kafka is the only choice. Also, I don't think "consumer.timeout.ms" is an option. -Jason On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi Jason, > > Thanks for your response. > > I know that is a known issue and I resolved it calling wakeup method > by another thread. But here my problem is different, let me explain , > it's very basic > > I created one cluster with 6 nodes( 3 from one datacenter and 3 from > another(remote) datacenter and kept replication factor 6 with 2 > zookeeper servers one from each datacenter ). Now I brought down all 3 > nodes of my local datacenter and produced few messages and I see > producer is working fine even my local data center nodes are down. It > successfully writes the messages to other data center nodes. But when > I'm trying to consume the messages the consumer.poll method gets stuck > as my local datacenter is down though other datacenter's nodes are up. > > My question is as the data has been written successfully to other > datacenter why consumer part is not working? > > Here is my Producer settings: > > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab. > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3 > -ab.sys.comcast.net:61617"); > props.put("acks", "1"); > props.put("max.block.ms", 1000); > props.put("key.serializer", "org.apache.kafka.common.serialization. > StringSerializer"); > props.put("value.serializer", "com.comcast.ps.kafka.object. > CustomMessageSer"); > > and here is Consumer settings: > > props.put("group.id", "app-consumer"); > props.put("enable.auto.commit", "false"); > props.put("auto.offset.reset", "earliest"); > props.put("auto.commit.interval.ms", "500"); > props.put("session.timeout.ms", "12"); > props.put("consumer.timeout.ms", "1"); > props.put("zookeeper.session.timeout.ms", "12"); > props.put("zookeeper.connection.timeout.ms", "6"); > props.put("offsets.storage","kafka"); > props.put("request.timeout.ms", "15"); > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net: > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616, > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3 > -ab.sys.comcast.net:61617"); > props.put("key.deserializer", "org.apache.kafka.common. > serialization.StringDeserializer"); > props.put("value.deserializer", > "com.comcast.ps.kafka.object.CustomMessageDeSer"); > > Is it because of consumer is not able to get the broker metadata if it > is trying to connect other datacenter's zookeeper server? I tried with > to increate the zookeeper session timeout and connection time out but no luck. > > Please help on this. > Thanks > Achintya > > > -Original Message- > From: Jason Gustafson [mailto:ja...@confluent.io] > Sent: Wednesday, August 31, 2016 4:05 PM > To: us...@kafka.apache.org > Cc: dev@kafk
RE: Kafka consumers unable to process message
Hi Jason, Thanks for your response. I know that is a known issue and I resolved it calling wakeup method by another thread. But here my problem is different, let me explain , it's very basic I created one cluster with 6 nodes( 3 from one datacenter and 3 from another(remote) datacenter and kept replication factor 6 with 2 zookeeper servers one from each datacenter ). Now I brought down all 3 nodes of my local datacenter and produced few messages and I see producer is working fine even my local data center nodes are down. It successfully writes the messages to other data center nodes. But when I'm trying to consume the messages the consumer.poll method gets stuck as my local datacenter is down though other datacenter's nodes are up. My question is as the data has been written successfully to other datacenter why consumer part is not working? Here is my Producer settings: props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617"); props.put("acks", "1"); props.put("max.block.ms", 1000); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "com.comcast.ps.kafka.object.CustomMessageSer"); and here is Consumer settings: props.put("group.id", "app-consumer"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "500"); props.put("session.timeout.ms", "12"); props.put("consumer.timeout.ms", "1"); props.put("zookeeper.session.timeout.ms", "12"); props.put("zookeeper.connection.timeout.ms", "6"); props.put("offsets.storage","kafka"); props.put("request.timeout.ms", "15"); props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.comcast.ps.kafka.object.CustomMessageDeSer"); Is it because of consumer is not able to get the broker metadata if it is trying to connect other datacenter's zookeeper server? I tried with to increate the zookeeper session timeout and connection time out but no luck. Please help on this. Thanks Achintya -Original Message- From: Jason Gustafson [mailto:ja...@confluent.io] Sent: Wednesday, August 31, 2016 4:05 PM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka consumers unable to process message Hi Achintya, We have a JIRA for this problem: https://issues. apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an exception in this case or do you just want to keep it from blocking indefinitely? If the latter, you could escape the poll from another thread using wakeup(). Thanks, Jason On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi there, > > Kafka consumer gets stuck at consumer.poll() method if my current > datacenter is down and replicated messages are in remote datacenter. > > How to solve that issue? > > Thanks > Achintya >
Kafka consumers unable to process message
Hi there, Kafka consumer gets stuck at consumer.poll() method if my current datacenter is down and replicated messages are in remote datacenter. How to solve that issue? Thanks Achintya
Kafka unable to process message
Hi there, What does the below error mean and how to avoid this? I see this error one of the kafkaServer.out file when other broker is down. And not able to process any message as we see o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker 5 from application log [2016-08-30 20:40:28,621] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@8b198c3 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
RE: Batch Expired
Hi Krishna, Thank you for your response. Connections already made but if we increase the request timeout 5 times let's say request.timeout.ms= 5*6 , then the number of 'Batch Expired ' exception is less, so what is the recommended value for ' request.timeout.ms '. If we increase more, is there any impact? Thanks Achintya -Original Message- From: R Krishna [mailto:krishna...@gmail.com] Sent: Friday, August 26, 2016 6:17 PM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Batch Expired Are any requests at all making it? That is a pretty big timeout. However, I noticed if there is no connections made to broker, you can still get batch expiry. On Fri, Aug 26, 2016 at 6:32 AM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi there, > > What is the recommended Producer setting for Producer as I see a lot > of Batch Expired exception even though I put request.timeout=6. > > Producer settings: > acks=1 > retries=3 > batch.size=16384 > linger.ms=5 > buffer.memory=33554432 > request.timeout.ms=6 > timeout.ms=6 > > Thanks > Achintya > -- Radha Krishna, Proddaturi 253-234-5657
Batch Expired
Hi there, What is the recommended Producer setting for Producer as I see a lot of Batch Expired exception even though I put request.timeout=6. Producer settings: acks=1 retries=3 batch.size=16384 linger.ms=5 buffer.memory=33554432 request.timeout.ms=6 timeout.ms=6 Thanks Achintya
Kafka Mirror maker duplicate issue
Hi there, I created a broker as stand by using Kafka Mirror maker but same messages gets consumed by both Source broker and mirror broker. Ex: I send 1000 messages let's say offset value 1 to 1000 and consumed 500 messages from the source broker. Now my broker goes down and want to read rest of the 500 messages from Mirror broker. But Mirror broker again gives all 1000 messages what I expected rest 500(offset value 501 to 1000). So there is no sync between the source and mirror broker. Any idea what settings we have to do to solve the issue. Thanks
RE: Kafka consumer getting duplicate message
Can anyone please check this one? Thanks Achintya -Original Message- From: Ghosh, Achintya (Contractor) Sent: Monday, August 08, 2016 9:44 AM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: RE: Kafka consumer getting duplicate message Thank you , Ewen for your response. Actually we are using 1.0.0.M2 Spring Kafka release that uses Kafka 0.9 release. Yes, we see a lot of duplicates and here is our producer and consumer settings in application. We don't see any duplicacy at Producer end I mean if we send 1000 messages to a particular Topic we receive exactly (sometimes less) 1000 messages. But when we consume the message at Consumer level we see a lot of messages with same offset value and same partition , so please let us know what tweaking is needed to avaoid the duplicacy. We have three types of Topics and each topic has 3 replication factors and 10 partitions. Producer Configuration: bootstrap.producer.servers=provisioningservices-aq-dev.g.comcast.net:80 acks=1 retries=3 batch.size=16384 linger.ms=5 buffer.memory=33554432 request.timeout.ms=6 timeout.ms=6 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=com.comcast.provisioning.provisioning_.kafka.CustomMessageSer Consumer Configuration: bootstrap.consumer.servers=provisioningservices-aqr-dev.g.comcast.net:80 group.id=ps-consumer-group enable.auto.commit=false auto.commit.interval.ms=100 session.timeout.ms=15000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=com.comcast.provisioning.provisioning_.kafka.CustomMessageDeSer factory.getContainerProperties().setSyncCommits(true); factory.setConcurrency(5); Thanks Achintya -Original Message- From: Ewen Cheslack-Postava [mailto:e...@confluent.io] Sent: Saturday, August 06, 2016 1:45 AM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka consumer getting duplicate message Achintya, 1.0.0.M2 is not an official release, so this version number is not particularly meaningful to people on this list. What platform/distribution are you using and how does this map to actual Apache Kafka releases? In general, it is not possible for any system to guarantee exactly once semantics because those semantics rely on the source and destination systems coordinating -- the source provides some sort of retry semantics, and the destination system needs to do some sort of deduplication or similar to only "deliver" the data one time. That said, duplicates should usually only be generated in the face of failures. If you're seeing a lot of duplicates, that probably means shutdown/failover is not being handled correctly. If you can provide more info about your setup, we might be able to suggest tweaks that will avoid these situations. -Ewen On Fri, Aug 5, 2016 at 8:15 AM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi there, > > We are using Kafka 1.0.0.M2 with Spring and we see a lot of duplicate > message is getting received by the Listener onMessage() method . > We configured : > > enable.auto.commit=false > session.timeout.ms=15000 > factory.getContainerProperties().setSyncCommits(true); > factory.setConcurrency(5); > > So what could be the reason to get the duplicate messages? > > Thanks > Achintya > -- Thanks, Ewen
RE: Kafka consumer getting duplicate message
Thank you , Ewen for your response. Actually we are using 1.0.0.M2 Spring Kafka release that uses Kafka 0.9 release. Yes, we see a lot of duplicates and here is our producer and consumer settings in application. We don't see any duplicacy at Producer end I mean if we send 1000 messages to a particular Topic we receive exactly (sometimes less) 1000 messages. But when we consume the message at Consumer level we see a lot of messages with same offset value and same partition , so please let us know what tweaking is needed to avaoid the duplicacy. We have three types of Topics and each topic has 3 replication factors and 10 partitions. Producer Configuration: bootstrap.producer.servers=provisioningservices-aq-dev.g.comcast.net:80 acks=1 retries=3 batch.size=16384 linger.ms=5 buffer.memory=33554432 request.timeout.ms=6 timeout.ms=6 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=com.comcast.provisioning.provisioning_.kafka.CustomMessageSer Consumer Configuration: bootstrap.consumer.servers=provisioningservices-aqr-dev.g.comcast.net:80 group.id=ps-consumer-group enable.auto.commit=false auto.commit.interval.ms=100 session.timeout.ms=15000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=com.comcast.provisioning.provisioning_.kafka.CustomMessageDeSer factory.getContainerProperties().setSyncCommits(true); factory.setConcurrency(5); Thanks Achintya -Original Message- From: Ewen Cheslack-Postava [mailto:e...@confluent.io] Sent: Saturday, August 06, 2016 1:45 AM To: us...@kafka.apache.org Cc: dev@kafka.apache.org Subject: Re: Kafka consumer getting duplicate message Achintya, 1.0.0.M2 is not an official release, so this version number is not particularly meaningful to people on this list. What platform/distribution are you using and how does this map to actual Apache Kafka releases? In general, it is not possible for any system to guarantee exactly once semantics because those semantics rely on the source and destination systems coordinating -- the source provides some sort of retry semantics, and the destination system needs to do some sort of deduplication or similar to only "deliver" the data one time. That said, duplicates should usually only be generated in the face of failures. If you're seeing a lot of duplicates, that probably means shutdown/failover is not being handled correctly. If you can provide more info about your setup, we might be able to suggest tweaks that will avoid these situations. -Ewen On Fri, Aug 5, 2016 at 8:15 AM, Ghosh, Achintya (Contractor) < achintya_gh...@comcast.com> wrote: > Hi there, > > We are using Kafka 1.0.0.M2 with Spring and we see a lot of duplicate > message is getting received by the Listener onMessage() method . > We configured : > > enable.auto.commit=false > session.timeout.ms=15000 > factory.getContainerProperties().setSyncCommits(true); > factory.setConcurrency(5); > > So what could be the reason to get the duplicate messages? > > Thanks > Achintya > -- Thanks, Ewen
Kafka consumer getting duplicate message
Hi there, We are using Kafka 1.0.0.M2 with Spring and we see a lot of duplicate message is getting received by the Listener onMessage() method . We configured : enable.auto.commit=false session.timeout.ms=15000 factory.getContainerProperties().setSyncCommits(true); factory.setConcurrency(5); So what could be the reason to get the duplicate messages? Thanks Achintya