Subtractor

2018-09-24 Thread Michael Eugene
Can someone explain to me the point of the Subtractor in an aggregator? I have to have one, because there is no concrete default implentation of it, but I am just trying to get a "normal" aggregation working and I don't see why I need a subtractor. Other than of course I need to make the progr

Re: Subtractor

2018-09-24 Thread Vasily Sulatskov
Hi, If I am not mistaken it works like this. Remember that kafka is a streaming system, i.e. there's no way for kafka streams to look at all the current value for a given key, and compute the aggregation by repeatedly calling your adder (starting with zero value). Values arrive at different times

Re: Subtractor

2018-09-24 Thread Michael Eugene
First off thanks or taking the time out of your schedule to respond. You lost me at almost the beginning, specifically at mapping to a different key. If those records come in... key=1, value=1 key=2, value=2 key=1, value=3 key=2, value=4 Here is all that should happen in my application 1. You

Kafka consumer offset topic index not getting deleted

2018-09-24 Thread Kaushik Nambiar
Hello, I am using a Kafka with version 0.11.xx. When I check the logs I can see the index segments for user defined topics are getting deleted. But I cannot find the indices for the consumer_offset topic getting deleted. That's causing around GBs of data getting accumulated in our persistent disk.

Re: Kafka consumer offset topic index not getting deleted

2018-09-24 Thread M. Manna
What are your settings for: 1) Offsets.retention.check.interval.ms 2) Offsets.retention.minutes (default is 7 days, not 24 hours). Also, did this occur even after you restarted any individual brokers? Please share the server.properties "As is" for your case. Regards, On Mon, 24 Sep 2018 at 12:1

Re: Subtractor

2018-09-24 Thread Vasily Sulatskov
Hi, Given that you need a subtractor you are probably calling KGroupedTable.aggregate(). In order to get a KGroupedTable you called (in a general case) KTable.groupBy(). I.e you have an original (pre-groupBy) table stream (changelog): where a message key is say pageId, and value is say number of

Referential transparency issue with transform()

2018-09-24 Thread Stéphane . D .
Hi, We just stumbled upon an issue with KStream.transform() where we had a runtime error with this code: ``` DeduplicationTransformer transformer = new DeduplicationTransformer<>(...); stream.transform(() -> transformer, ...) ``` The error was: Failed to process stream task 0_0 due to the follow

Re: Referential transparency issue with transform()

2018-09-24 Thread Bill Bejeck
Hi Stephane, Yes, you'll want to have your TransformerSupplier to return a new instance of your Transformer Otherwise, the same Transformer instance is used across all tasks. Since the Transformer can potentially perform stateful operations with a state store, and/or you can schedule punctations u

Re: Kafka stream issue : Deleting obsolete state directory

2018-09-24 Thread Bill Bejeck
Hi Bhavesh, I've taken a look at what you've posted, but I'll need the logs *before *the exception occurred. Can you attach the entire log? In the meantime, if this causing issues, you can effectively disable *state.cleanup.delay.ms *to a large value (Long.MAX_VAL

What happens if the complete Kafka Cluster crashes

2018-09-24 Thread Toni Zehnder
Hi there, What happens with the messages if a complete Kafka Cluster dies? Are they lost? Or writes Kafka the message to the hard disk so if the nodes gets started again they still have the messages? Best regards Toni

Re: Referential transparency issue with transform()

2018-09-24 Thread Bill Bejeck
Hi Stéphane, Can you provide log files from the crash so we can see the exact failure? Thanks, Bill On Mon, Sep 24, 2018 at 8:12 AM Stéphane. D. wrote: > Hi, > > We just stumbled upon an issue with KStream.transform() where we had a > runtime error with this code: > > ``` > DeduplicationTransf

RE: [External] What happens if the complete Kafka Cluster crashes

2018-09-24 Thread Tauzell, Dave
It is possible that if all the nodes fail at about the same time and after the broker acknowledged the message, then some messages will be lost because they were in memory and not yet fully written to the disk. If you set ACKS=all then this requires all of your replicas to fail in this way to

No referential transparency with transform() ?

2018-09-24 Thread Stéphane . D .
Hi, We just stumbled upon an issue with KStream.transform() where we had a runtime error with this code: ``` DeduplicationTransformer transformer = new DeduplicationTransformer<>(...); stream.transform(() -> transformer, ...) ``` The error is: Failed to process stream task 0_0 due to the followi

Re: Kafka stream issue : Deleting obsolete state directory

2018-09-24 Thread Bhavesh Patel
Hi Bill, Please find the log as requested. 2018-09-24 15:53:59.594 INFO 1 --- [pool-2-thread-1] i.micrometer.influx.InfluxMeterRegistry : successfully sent 2793 metrics to influx 2018-09-24 15:54:59.608 INFO 1 --- [pool-2-thread-1] i.micrometer.influx.InfluxMeterRegistry : successfully sent

Re: Kafka consumer offset topic index not getting deleted

2018-09-24 Thread Kaushik Nambiar
Hello, Thankyou for your reply. I have attached a text file containing the data within the server.properties file. Also I could see that it was the .log files within the __consumer_offset topic that were sizing around 100 mb each. So due to many such log files,the disk is getting maxed out. Your

Re: No referential transparency with transform() ?

2018-09-24 Thread Damian Guy
The return value from the `TransformSupplier` should always be a `new YourTransformer(..)` as there will be one for each task and they are potentially processed on multiple threads. On Mon, 24 Sep 2018 at 16:07 Stéphane. D. wrote: > Hi, > > We just stumbled upon an issue with KStream.transform()

Re: No referential transparency with transform() ?

2018-09-24 Thread Guozhang Wang
Hello Stéphane, As the Javadoc of TransformerSupplier#get() states: Return a new {@link Transformer} instance. Since Streams is single-thread isolation per-task by-design. If users ignore the instruction and reuse the same Transformer instance it will result in undefined behavior, most likely

Re: Subtractor

2018-09-24 Thread Michael Eugene
Ok thanks for taking the time again to respond. So you are saying that the subtractor actually handles when the key changes for the earlier groupBy? (Maybe not 100% but that is sort of what it is handling). Also my code is below (I took out some of it to avoid clutter) - I am grouping twice and

Re: Subtractor

2018-09-24 Thread Vasily Sulatskov
Hi, As far as I understand your code it's not a very good example of table aggregation. The way I see it, your code does the following: 1. You take some stream 2. Do some map, changing keys and values 3. groupByKey 4. Reduce - keeping the last value for every key, here you get a KTable 5. GroupBy

Re: Subtractor

2018-09-24 Thread Michael Eugene
Thanks for your response again. As far as your feedback on what I am trying to accomplish - I need to use the aggregate method because I need the aggregator to be very dynamic. Multiple, heterogenous types need to be handled in one aggregation. So I can’t really use a reducer for that. I