I see. That makes sense. Actually, GlobalKTable work exactly this way -- it's not just a broadcasted table, it's also a non-synchronized table. Not sure if this would work for you -- the broadcasted property might be a deal breaker for your use case.
Personally, I believe there is a design space with broadcasted vs sharded, and time-synchronized and non-synchronized, offering 4 different implementations. Atm, we only have two of them and I would love to get all four variants. -Matthias On 9/17/18 11:18 AM, Thomas Becker wrote: > Hi Matthias, > I'm familiar with how the timestamp synchronization currently works. I also > submit that it does not work for our use-case, which is the following: The > table-backing topic contains records with the best available data we have for > a given item. IF a record in this topic is updated, we would always prefer to > join using this data *regardless* of whether it is "newer" than the incoming > event we are trying to join it with. > > Essentially, streams assumes that we must want the table data that was > current at the time the event was produced, and here we simply don't. If we > have newer data, we want that. But my larger concern here is actually > reprocessing; when doing that the older table-data will be log compacted away > and the current timestamp semantics will result in events that occurred prior > to the latest table updates being unjoined. Does this make sense now? > > Thanks! > Tommy > > On Mon, 2018-09-17 at 09:51 -0700, Matthias J. Sax wrote: > > I am not sure if this feature would help with stream-table joins. Also > > note, that we recently merged a PR that improves the timestamp > > synchronization of Kafka Streams -- this will vastly improve the guarantees. > > > What I don't understand: > > > So table records that have been updated recently will not be read until the > stream records reach or exceed that same timestamp. > > > Yes, this is on purpose / by design. > > > and if they do it will be with old data > > > What do you mean by "old data"? By definition, the stream record will > > join with a table that contains data up-to the stream record's > > timestamp. It does semantically not make sense to advance the table > > beyond the stream record's timestamp, because if you do this, you would > > semantically join with "future data" what---from my point of view---is > > semantically incorrect. > > > Shameless plug: you might want to read > > https://www.confluent.io/blog/streams-tables-two-sides-same-coin > > > > > -Matthias > > > On 9/17/18 8:23 AM, Thomas Becker wrote: > > For my part, a major use-case for this feature is stream-table joins. > Currently, KafkaStreams does the wrong thing in some cases because the only > message choosing strategy available is timestamp-based. So table records that > have been updated recently will not be read until the stream records reach or > exceed that same timestamp. So there is no guarantee these records get joined > at all, and if they do it will be with old data. I realize we're talking > about the consumer here and not streams specifically, but as it stands I > can't even write a non-streams application that does a join but prioritizes > table-topic records over stream records without using multiple consumers. > > > On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote: > > > Hi all, > > > > I agree that DISCUSS is more appropriate than VOTE at this point, since I > don't remember the last discussion coming to a definite conclusion. > > > > I guess my concern is that this will add complexity and memory consumption on > the server side. In the case of incremental fetch requests, we will have to > track at least two extra bytes per partition, to know what the priority of > each partition is within each active fetch session. > > > > It would be nice to hear more about the use-cases for this feature. I think > Gwen asked about this earlier, and I don't remember reading a response. The > fact that we're now talking about Samza interfaces is a bit of a red flag. > After all, Samza didn't need partition priorities to do what it did. You can > do a lot with muting partitions and using appropriate threading in your code. > > > > For example, you can hand data from a partition off to a work queue with a > fixed size, which is handled by a separate service thread. If the queue gets > full, you can mute the partition until some of the buffered data is > processed. Kafka Streams uses a similar approach to avoid reading partition > data that isn't immediately needed. > > > > There might be some use-cases that need priorities eventually, but I'm > concerned that we're jumping the gun by trying to implement this before we > know what they are. > > > > best, > > > Colin > > > > > On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote: > > > > On 05.09.2018 02:38, > n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>> > wrote: > > > > On Sep 4, 2018, at 4:20 PM, Jan Filipiak > <jan.filip...@trivago.com<mailto:jan.filip...@trivago.com><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>>> > wrote: > > > > what I meant is litterally this interface: > > > > https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html > > <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html> > > > Hi Jan, > > > > Thanks for the reply and I have a few questions. This Samza doc > > > > https://samza.apache.org/learn/documentation/0.14/container/streams.html > <https://samza.apache.org/learn/documentation/0.14/container/streams.html> > > > > indicates that the chooser is set via configuration. Are you suggesting > adding a new configuration for Kafka ? Seems like we could also have a > method on KafkaConsumer > > > > public void register(MessageChooser messageChooser) > > > I don't have strong opinions regarding this. I like configs, i also > > > don't think it would be a problem to have both. > > > > > to make it more dynamic. > > > > Also, the Samza MessageChooser interface has method > > > > /* Notify the chooser that a new envelope is available for a processing. */ > > > void update(IncomingMessageEnvelope envelope) > > > > and I’m wondering how this method would be translated to Kafka API. In > particular what corresponds to IncomingMessageEnvelope. > > > I think Samza uses the envelop abstraction as they support other sources > > > besides kafka aswell. They are more > > > on the spark end of things when it comes to different input types. I > > > don't have strong opinions but it feels like > > > we wouldn't need such a thing in the kafka consumer but just use a > > > regular ConsumerRecord or so. > > > > Best, > > > -- > > > Nick > > > > > > > > ________________________________ > > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, or > distribution of this email (or any attachments) by others is prohibited. If > you are not the intended recipient, please contact the sender immediately and > permanently delete this email and any attachments. No employee or agent of > TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo > Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed > written agreement. > > > > > ________________________________ > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, or > distribution of this email (or any attachments) by others is prohibited. If > you are not the intended recipient, please contact the sender immediately and > permanently delete this email and any attachments. No employee or agent of > TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo > Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed > written agreement. >
signature.asc
Description: OpenPGP digital signature