Re: Customers are getting same emails for roughly 30-40 times
Hi, You can check Consumer Api https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/KafkaConsumer.html . Refer : Manual Offset Control --Senthil On Sat, May 25, 2019, 9:53 AM ASHOK MACHERLA wrote: > Dear Hans > > Thanks for you reply > > As you said we are getting same issue, our consumers some times goes to > rebalance mode, during this time customer getting duplicate emails. > > So, How to set manual commit offsets?? > > Is there any parameters to add for that. > Please reply this email > > Sent from Outlook >
Re: Customers are getting same emails for roughly 30-40 times
Dear Hans Thanks for you reply As you said we are getting same issue, our consumers some times goes to rebalance mode, during this time customer getting duplicate emails. So, How to set manual commit offsets?? Is there any parameters to add for that. Please reply this email Sent from Outlook
Re: Customers are getting same emails for roughly 30-40 times
Its not just the config, you need to change your code. kafka.auto.commit.interval.ms=3000 means that consumers only commit offsets every 3 seconds so if there is any failure or rebalance they will reconsume up to 3 seconds of data per partition. That could be many hundreds or thousands of messages. I would recommend you not use auto commit at all and instead manually commit offsets immediately after sending each email or batch of emails. -hans > On May 24, 2019, at 4:35 AM, ASHOK MACHERLA wrote: > > Dear Team > > > > First of all thanks for reply on this issue. > > > > Right now we are using these configurations at consumer side > > > > kafka.max.poll.records=20 > > max.push.batch.size=100 > > enable.auto.commit=true > > auto.offset.reset=latest > > kafka.auto.commit.interval.ms=3000 > > kafka.session.timeout.ms=1 > > kafka.request.timeout.ms=3000 > > kafka.heartbeat.interval.ms=3000 > > kafka.max.poll.interval.ms=30 > > > > > > can you please suggest us to change the above config parameters . > > > > > > We are using one Kafka topic with 10 partitions and 10 consumers, we are > sending lakhs of emails to the customers , > > It’s enough that much partitions and consumer ?? > > > > Otherwise I have to increase that partitions and consumers ?? > > > > Please suggest us .. > > > > > > in consumer logs , its showing > > consumer group is rebalancing before committed because already group is > rebalancing > > > > Sent from Outlook. > > > > > From: Vincent Maurin > Sent: Friday, May 24, 2019 3:51:23 PM > To: users@kafka.apache.org > Subject: Re: Customers are getting same emails for roughly 30-40 times > > It also seems you are using "at least one" strategy (maybe with > auto-commit, or commiting after sending the email) > Maybe a "at most once" could be a valid business strategy here ? > > - at least once (you will deliver all the emails, but you could deliver > duplicates) > consumeMessages > sendEmails > commitOffsets > > - at most once (you will never deliver duplicates, but you might never > deliver an given email) > consumeMessages > commitOffsets > sendEmails > > Ideally, you could do "exactly once", but it is hard to achieve in the > scenario, Kafka -> External system. The usual strategy here is to have an > idempotent operation in combination with a "at least once" strategy > > Best, > Vincent > > On Fri, May 24, 2019 at 10:39 AM Liam Clarke > wrote: > >> Consumers will rebalance if you add partitions, add consumers to the group >> or if a consumer leaves the group. >> >> Consumers will leave the group after not communicating with the server for >> a period set by session.timeout.ms. This is usually due to an exception in >> the code polling with the consumer, or message processing code taking too >> long. >> >> If your consumers are reprocessing messages thus causing emails to send, it >> implies that they weren't able to commit their offsets before >> failing/timing out. >> >> We had a similar issue in a database sink that consumed from Kafka and >> duplicated data because it took too long, and hit the session timeout, and >> then wasn't able to commits its offsets. >> >> So I'd look closely at your consuming code and log every possible source of >> exceptions. >> >> Kind regards, >> >> Liam Clarke >> >>> On Fri, 24 May 2019, 7:37 pm ASHOK MACHERLA, wrote: >>> >>> Dear Team Member >>> >>> Currently we are using Kafka 0.10.1, zookeeper 3.4.6 versions. In our >>> project we have to send bulk emails to customers for this purpose we are >>> using Kafka cluster setup. >>> >>> But customers are getting same emails for roughly 30-40 times. This is >>> very worst thing. In this situation our consumer group is showing >>> rebalancing. Might be its could be reason ? >>> Currently one topic we are using for this. We have 10 partitions and 10 >>> consumers. >>> I hope we have enough partitions and consumer as well. >>> But I don’t know exactly number of partitions & consumer are required to >>> overcome this issue. >>> >>> Can you please suggest us to fix this issue. >>> >>> If anything changes required in Kafka side as well as consumer side?? >>> How to stop rebalancing issue?? >>> Please suggest us, Thanks >>> >>> >>> >>> Sent from Outlook. >>> >>> >>
I have lots of WARN in log every day
I have lots of WARN in log every day.like these,what does means?[2019-05-24 11:05:06,784] WARN Attempting to send response via channel for which there is no open connection, connection id 2 (kafka.network.Processor) [2019-05-24 11:05:10,884] WARN Attempting to send response via channel for which there is no open connection, connection id 3 (kafka.network.Processor) [2019-05-24 11:10:13,635] WARN Attempting to send response via channel for which there is no open connection, connection id 2 (kafka.network.Processor) [2019-05-24 11:10:28,156] WARN Attempting to send response via channel for which there is no open connection, connection id 6 (kafka.network.Processor) kafka_2.11-0.10.2.1 zookeeper-3.4.10 igyu
Re: Stream application :: State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED continuously
Hello Nayanjyoti, Regarding the KIP-328, on-disk buffer is indeed being implemented but it has not been completed and unfortunately has to slip to the next release. Now about the "PARTITIONS_REVOKED to PARTITIONS_ASSIGNED" issue, it is possible that if you are restoring tons of data from the changelog, then it took long time and while you are doing it since stream did not call consumer.poll() in time it would be kicked out of the group again. Guozhang On Tue, May 21, 2019 at 5:50 AM Jonathan Santilli < jonathansanti...@gmail.com> wrote: > Hello Nayanjyoti, about this part you mentioned: > > "Also, we had noticed that on restarts the downstream of the suppress > operator is *flooded* with events, which in the ideal case wouldn't have > come. I came across https://stackoverflow.com/a/54227156 where Matthias > had > responded the behaviour of the supress buffer being in memory. (for 2.1 > version) and that it reads changelog to *recreate* the buffer which should > actually *prevent* the behaviour(downstream being flooded) mentioned above. > Am I missing something?" > > It was me who asked that question in SO ( > https://stackoverflow.com/a/54227156) > Yes, in the version 2.2.0 the bug is still there but has been solved in the > version 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895) (which is > under voting right now 2.2.1-RC1 > https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html) > I have tested the App that was suffering that problem and now is solved. Of > course, you need to test your own App. > > I hope that helps. > > Cheers! > -- > Jonathan > > > On Tue, May 21, 2019 at 11:29 AM Nayanjyoti Deka > wrote: > > > Hi Guozhang, > > > > I had looked more into it. Seems that on restart the suppress changelog > > topic was being recreated and at that time there were no heartbeats to > the > > broker from the application, hence causing it to behave this way. I could > > see the log of reading the suppress changelog topic from offset 0 on the > > restart. > > > > I'm trying to understand why it needs to read the entire changelog topic > > since the window which has passed (past time) should have been compacted > > (or maybe deleted) from the broker's topic data. > > > > Also, we had noticed that on restarts the downstream of the suppress > > operator is *flooded* with events, which in the ideal case wouldn't have > > come. I came across https://stackoverflow.com/a/54227156 where Matthias > > had > > responded the behaviour of the supress buffer being in memory. (for 2.1 > > version) and that it reads changelog to *recreate* the buffer which > should > > actually *prevent* the behaviour(downstream being flooded) mentioned > above. > > Am I missing something? > > > > We are using 2.2 version which probably has the same behaviour as well. > > > > Please correct me if I'm wrong on my analysis somewhere. > > > > Also if possible, could you please provide me with an understanding of > why > > suppress was implemented with an in-memory buffer in mind first and not > > spilling to disk. I have read the KIP document, > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > > , > > but this doesn't mention any specifics as to why the in-memory > > implementation was chosen since across restarts on-disk spills would have > > provided the exact semantics described by the operator. > > > > > > > > On Mon, May 20, 2019 at 9:24 PM Guozhang Wang > wrote: > > > > > Hello Nayanjyoti, > > > > > > Did you find anything else from the streams log entries (is it enabled > on > > > DEBUG or TRACE?), and what version of Kafka are you using? > > > > > > > > > Guozhang > > > > > > On Sun, May 19, 2019 at 1:04 PM Nayanjyoti Deka > > > wrote: > > > > > > > Forgot to add that there is no transition to RUNNING state. > > > > > > > > On Mon, May 20, 2019 at 1:10 AM Nayanjyoti Deka < > nayanjy...@ixigo.com> > > > > wrote: > > > > > > > > > Hey guys, > > > > > > > > > > We are running a stream application in our production environment. > On > > > our > > > > > latest restart, the application is consistently moving between > these > > > two > > > > > states. > > > > > > > > > > From our logs: > > > > > > > > > > grep "State transition from " application.log | jq -r '.message' | > > > sort | > > > > > uniq -c | sort -n -r > > > > > > > > > > 40 stream-thread [-StreamThread-9] State transition from > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > 40 stream-thread [-StreamThread-8] State transition from > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > 40 stream-thread [-StreamThread-7] State transition from > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > 40 stream-thread [-StreamThread-6] State transition from > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > 40 stream-thread [-StreamThread-5] State transition from > > > > >
Re: Streams configuration for a stream with long varied processing times
Hi Raman, Since you are using `transformation` already which is a lower-level API in DSL, you can basically do arbitrary logic like threading pool to process the records within your `process()` or `transform()` function. Note that, like consumer docs mentioned `Typically, you must disable automatic commits and manually commit processed offsets for records... ` the offset committing mechanism is trickier, so you'd probably need to also turn off time-based commit as well and call `context.commit()` yourself when you are certain that all records going through this transform have been completed processing for now. There's an on-going discussion about adding async processing as an OOTB feature, also as a way to further improve elastic scalability. Feel free to share your thoughts: https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams Guozhang On Wed, May 22, 2019 at 6:57 AM Raman Gupta wrote: > I have a situation in which I have a stream that does a > transformation. This transformation can take as little as 10-30s or up > to about 15 minutes to run. The stream works just fine, until a > rebalance happens in the middle of long processing. Rebalancing > sometimes takes a long time, and sometimes, a new rebalance is started > by Kafka soon after the previous one completes, and this pattern > usually continues for some time. > > Reading the docs for Kafka consumer, I see this gem: > > > For use cases where message processing time varies unpredictably, > neither of these options may be sufficient. The recommended way to handle > these cases is to move message processing to another thread, which allows > the consumer to continue calling poll while the processor is still working. > Some care must be taken to ensure that committed offsets do not get ahead > of the actual position. Typically, you must disable automatic commits and > manually commit processed offsets for records only after the thread has > finished handling them (depending on the delivery semantics you need). Note > also that you will need to pause the partition so that no new records are > received from poll until after thread has finished handling those > previously returned. > > Ok awesome, that sounds easy enough and seems to be exactly what I > need. Doing the processing in a separate thread would make the > rebalance a snap. > > However, is there a way to do this, or something similar, with > streams? I would like to use the streams abstraction rather than the > consumer/producer APIs directly. > > Regards, > Raman > -- -- Guozhang
RE: Customers are getting same emails for roughly 30-40 times
Dear Team First of all thanks for reply on this issue. Right now we are using these configurations at consumer side kafka.max.poll.records=20 max.push.batch.size=100 enable.auto.commit=true auto.offset.reset=latest kafka.auto.commit.interval.ms=3000 kafka.session.timeout.ms=1 kafka.request.timeout.ms=3000 kafka.heartbeat.interval.ms=3000 kafka.max.poll.interval.ms=30 can you please suggest us to change the above config parameters . We are using one Kafka topic with 10 partitions and 10 consumers, we are sending lakhs of emails to the customers , It’s enough that much partitions and consumer ?? Otherwise I have to increase that partitions and consumers ?? Please suggest us .. in consumer logs , its showing consumer group is rebalancing before committed because already group is rebalancing Sent from Outlook. From: Vincent Maurin Sent: Friday, May 24, 2019 3:51:23 PM To: users@kafka.apache.org Subject: Re: Customers are getting same emails for roughly 30-40 times It also seems you are using "at least one" strategy (maybe with auto-commit, or commiting after sending the email) Maybe a "at most once" could be a valid business strategy here ? - at least once (you will deliver all the emails, but you could deliver duplicates) consumeMessages sendEmails commitOffsets - at most once (you will never deliver duplicates, but you might never deliver an given email) consumeMessages commitOffsets sendEmails Ideally, you could do "exactly once", but it is hard to achieve in the scenario, Kafka -> External system. The usual strategy here is to have an idempotent operation in combination with a "at least once" strategy Best, Vincent On Fri, May 24, 2019 at 10:39 AM Liam Clarke wrote: > Consumers will rebalance if you add partitions, add consumers to the group > or if a consumer leaves the group. > > Consumers will leave the group after not communicating with the server for > a period set by session.timeout.ms. This is usually due to an exception in > the code polling with the consumer, or message processing code taking too > long. > > If your consumers are reprocessing messages thus causing emails to send, it > implies that they weren't able to commit their offsets before > failing/timing out. > > We had a similar issue in a database sink that consumed from Kafka and > duplicated data because it took too long, and hit the session timeout, and > then wasn't able to commits its offsets. > > So I'd look closely at your consuming code and log every possible source of > exceptions. > > Kind regards, > > Liam Clarke > > On Fri, 24 May 2019, 7:37 pm ASHOK MACHERLA, wrote: > > > Dear Team Member > > > > Currently we are using Kafka 0.10.1, zookeeper 3.4.6 versions. In our > > project we have to send bulk emails to customers for this purpose we are > > using Kafka cluster setup. > > > > But customers are getting same emails for roughly 30-40 times. This is > > very worst thing. In this situation our consumer group is showing > > rebalancing. Might be its could be reason ? > > Currently one topic we are using for this. We have 10 partitions and 10 > > consumers. > > I hope we have enough partitions and consumer as well. > > But I don’t know exactly number of partitions & consumer are required to > > overcome this issue. > > > > Can you please suggest us to fix this issue. > > > > If anything changes required in Kafka side as well as consumer side?? > > How to stop rebalancing issue?? > > Please suggest us, Thanks > > > > > > > > Sent from Outlook. > > > > >
Re: Customers are getting same emails for roughly 30-40 times
It also seems you are using "at least one" strategy (maybe with auto-commit, or commiting after sending the email) Maybe a "at most once" could be a valid business strategy here ? - at least once (you will deliver all the emails, but you could deliver duplicates) consumeMessages sendEmails commitOffsets - at most once (you will never deliver duplicates, but you might never deliver an given email) consumeMessages commitOffsets sendEmails Ideally, you could do "exactly once", but it is hard to achieve in the scenario, Kafka -> External system. The usual strategy here is to have an idempotent operation in combination with a "at least once" strategy Best, Vincent On Fri, May 24, 2019 at 10:39 AM Liam Clarke wrote: > Consumers will rebalance if you add partitions, add consumers to the group > or if a consumer leaves the group. > > Consumers will leave the group after not communicating with the server for > a period set by session.timeout.ms. This is usually due to an exception in > the code polling with the consumer, or message processing code taking too > long. > > If your consumers are reprocessing messages thus causing emails to send, it > implies that they weren't able to commit their offsets before > failing/timing out. > > We had a similar issue in a database sink that consumed from Kafka and > duplicated data because it took too long, and hit the session timeout, and > then wasn't able to commits its offsets. > > So I'd look closely at your consuming code and log every possible source of > exceptions. > > Kind regards, > > Liam Clarke > > On Fri, 24 May 2019, 7:37 pm ASHOK MACHERLA, wrote: > > > Dear Team Member > > > > Currently we are using Kafka 0.10.1, zookeeper 3.4.6 versions. In our > > project we have to send bulk emails to customers for this purpose we are > > using Kafka cluster setup. > > > > But customers are getting same emails for roughly 30-40 times. This is > > very worst thing. In this situation our consumer group is showing > > rebalancing. Might be its could be reason ? > > Currently one topic we are using for this. We have 10 partitions and 10 > > consumers. > > I hope we have enough partitions and consumer as well. > > But I don’t know exactly number of partitions & consumer are required to > > overcome this issue. > > > > Can you please suggest us to fix this issue. > > > > If anything changes required in Kafka side as well as consumer side?? > > How to stop rebalancing issue?? > > Please suggest us, Thanks > > > > > > > > Sent from Outlook. > > > > >
Re: Customers are getting same emails for roughly 30-40 times
Consumers will rebalance if you add partitions, add consumers to the group or if a consumer leaves the group. Consumers will leave the group after not communicating with the server for a period set by session.timeout.ms. This is usually due to an exception in the code polling with the consumer, or message processing code taking too long. If your consumers are reprocessing messages thus causing emails to send, it implies that they weren't able to commit their offsets before failing/timing out. We had a similar issue in a database sink that consumed from Kafka and duplicated data because it took too long, and hit the session timeout, and then wasn't able to commits its offsets. So I'd look closely at your consuming code and log every possible source of exceptions. Kind regards, Liam Clarke On Fri, 24 May 2019, 7:37 pm ASHOK MACHERLA, wrote: > Dear Team Member > > Currently we are using Kafka 0.10.1, zookeeper 3.4.6 versions. In our > project we have to send bulk emails to customers for this purpose we are > using Kafka cluster setup. > > But customers are getting same emails for roughly 30-40 times. This is > very worst thing. In this situation our consumer group is showing > rebalancing. Might be its could be reason ? > Currently one topic we are using for this. We have 10 partitions and 10 > consumers. > I hope we have enough partitions and consumer as well. > But I don’t know exactly number of partitions & consumer are required to > overcome this issue. > > Can you please suggest us to fix this issue. > > If anything changes required in Kafka side as well as consumer side?? > How to stop rebalancing issue?? > Please suggest us, Thanks > > > > Sent from Outlook. > >
Re: Restart of kafka-streams with more than one partition on topic is reprocessing the data from beginning of the topic
Hello Kalyani, try testing the RC kafka-2.2.1-rc1, for what you describe seems to be a problem that has been solved in the version 2.2.1 ( https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting right now 2.2.1-RC1 https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html) I have tested the App that was suffering that problem and now is solved. Of course, you need to test your own App. Hope that helps. Cheers! -- Jonathan On Thu, May 23, 2019 at 5:37 PM kalyani yarlagadda < kalyani.yarlagad...@gmail.com> wrote: > Hi, > > I need assistance in the below scenario. Please help me with this. > > I am using the hopping time window in Kafka streams. I am facing an issue > on the restart of my Kafka application, the application is processing the > data from the beginning offset. > However, it is happening only when the topic has more than one partition. > If the topic has only 1 partition then on the restart of the application > the sliding window is working fine. > > *Kafka Version:* *2.1.0* > > *Eg:* > > Time Window is 4 hours > advanceBy 5 minutes > Application is started at time A and running for every 5minutes with the > stream data of 4hours > and now stopped at a timestamp say X, and restarted at timestamp Y > > *The behavior for a single partition*: After the restart, the streams are > processed from time X to time Y for every 5minutes. > *The behavior for a more than one partition: *After the restart, the > streams are processed from time A to time Y for every 5minutes. > > *I am adding the POC code below* > > > > > > > > > > *// define the time window as a hopping time window TimeWindows timeWindow > = TimeWindows.of(Duration.ofHours(4)) .advanceBy(Duration.ofMinutes(5)) > .grace(Duration.ofMinutes(1)); KTable, MetricsTimeSeries> > windowedMetricsTimeSeriesStream = builder.stream("metrics_ip", > Consumed.with(Serdes.String(), new JSONSerde<>())) .groupByKey() > .windowedBy(timeWindow) .aggregate(() -> new MetricsTimeSeries(), /* > initializer */ * //*MetricsTimeSeries* is the aggregator class > > > > > > > > > > > * (aggKey, newValue, aggValue) -> { aggValue.addDataPoint(newValue); return > aggValue; }, /* adder */ > Materialized.as("windowed_aggregated_metric_store_poc_partitions")) /* > state store name */ > > .suppress(Suppressed.untilWindowCloses(BufferConfig.maxBytes(10*1024*1024).shutDownWhenFull())); > windowedMetricsTimeSeriesStream .toStream() .map((key, value) -> //mapping > logic goes here ) .to("metrics_op");* > > *Properties set to Kafka Streams:* > > > > > > > *StreamsConfig.APPLICATION_ID_CONFIG - > > "streams_changedetection_poc_partitions"StreamsConfig.BOOTSTRAP_SERVERS_CONFIG > - "kafka:9092"StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG - > > Serdes.String().getClass()StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS > - Serdes.String().getClass()StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG > - JSONSerde.class StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS - > JSONSerde.classConsumerConfig.AUTO_OFFSET_RESET_CONFIG - "latest"* > > > > > > Thanks in Advance. > > Kalyani Y, > 9177982636 > -- Santilli Jonathan
Customers are getting same emails for roughly 30-40 times
Dear Team Member Currently we are using Kafka 0.10.1, zookeeper 3.4.6 versions. In our project we have to send bulk emails to customers for this purpose we are using Kafka cluster setup. But customers are getting same emails for roughly 30-40 times. This is very worst thing. In this situation our consumer group is showing rebalancing. Might be its could be reason ? Currently one topic we are using for this. We have 10 partitions and 10 consumers. I hope we have enough partitions and consumer as well. But I don’t know exactly number of partitions & consumer are required to overcome this issue. Can you please suggest us to fix this issue. If anything changes required in Kafka side as well as consumer side?? How to stop rebalancing issue?? Please suggest us, Thanks Sent from Outlook.