I see. That makes sense.

Actually, GlobalKTable work exactly this way -- it's not just a
broadcasted table, it's also a non-synchronized table. Not sure if this
would work for you -- the broadcasted property might be a deal breaker
for your use case.

Personally, I believe there is a design space with broadcasted vs
sharded, and time-synchronized and non-synchronized, offering 4
different implementations. Atm, we only have two of them and I would
love to get all four variants.


-Matthias


On 9/17/18 11:18 AM, Thomas Becker wrote:
> Hi Matthias,
> I'm familiar with how the timestamp synchronization currently works. I also 
> submit that it does not work for our use-case, which is the following: The 
> table-backing topic contains records with the best available data we have for 
> a given item. IF a record in this topic is updated, we would always prefer to 
> join using this data *regardless* of whether it is "newer" than the incoming 
> event we are trying to join it with.
> 
> Essentially, streams assumes that we must want the table data that was 
> current at the time the event was produced, and here we simply don't. If we 
> have newer data, we want that. But my larger concern here is actually 
> reprocessing; when doing that the older table-data will be log compacted away 
> and the current timestamp semantics will result in events that occurred prior 
> to the latest table updates being unjoined. Does this make sense now?
> 
> Thanks!
> Tommy
> 
> On Mon, 2018-09-17 at 09:51 -0700, Matthias J. Sax wrote:
> 
> I am not sure if this feature would help with stream-table joins. Also
> 
> note, that we recently merged a PR that improves the timestamp
> 
> synchronization of Kafka Streams -- this will vastly improve the guarantees.
> 
> 
> What I don't understand:
> 
> 
> So table records that have been updated recently will not be read until the 
> stream records reach or exceed that same timestamp.
> 
> 
> Yes, this is on purpose / by design.
> 
> 
> and if they do it will be with old data
> 
> 
> What do you mean by "old data"? By definition, the stream record will
> 
> join with a table that contains data up-to the stream record's
> 
> timestamp. It does semantically not make sense to advance the table
> 
> beyond the stream record's timestamp, because if you do this, you would
> 
> semantically join with "future data" what---from my point of view---is
> 
> semantically incorrect.
> 
> 
> Shameless plug: you might want to read
> 
> https://www.confluent.io/blog/streams-tables-two-sides-same-coin
> 
> 
> 
> 
> -Matthias
> 
> 
> On 9/17/18 8:23 AM, Thomas Becker wrote:
> 
> For my part, a major use-case for this feature is stream-table joins. 
> Currently, KafkaStreams does the wrong thing in some cases because the only 
> message choosing strategy available is timestamp-based. So table records that 
> have been updated recently will not be read until the stream records reach or 
> exceed that same timestamp. So there is no guarantee these records get joined 
> at all, and if they do it will be with old data. I realize we're talking 
> about the consumer here and not streams specifically, but as it stands I 
> can't even write a non-streams application that does a join but prioritizes 
> table-topic records over stream records without using multiple consumers.
> 
> 
> On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:
> 
> 
> Hi all,
> 
> 
> 
> I agree that DISCUSS is more appropriate than VOTE at this point, since I 
> don't remember the last discussion coming to a definite conclusion.
> 
> 
> 
> I guess my concern is that this will add complexity and memory consumption on 
> the server side.  In the case of incremental fetch requests, we will have to 
> track at least two extra bytes per partition, to know what the priority of 
> each partition is within each active fetch session.
> 
> 
> 
> It would be nice to hear more about the use-cases for this feature.  I think 
> Gwen asked about this earlier, and I don't remember reading a response.  The 
> fact that we're now talking about Samza interfaces is a bit of a red flag.  
> After all, Samza didn't need partition priorities to do what it did.  You can 
> do a lot with muting partitions and using appropriate threading in your code.
> 
> 
> 
> For example, you can hand data from a partition off to a work queue with a 
> fixed size, which is handled by a separate service thread.  If the queue gets 
> full, you can mute the partition until some of the buffered data is 
> processed.  Kafka Streams uses a similar approach to avoid reading partition 
> data that isn't immediately needed.
> 
> 
> 
> There might be some use-cases that need priorities eventually, but I'm 
> concerned that we're jumping the gun by trying to implement this before we 
> know what they are.
> 
> 
> 
> best,
> 
> 
> Colin
> 
> 
> 
> 
> On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:
> 
> 
> 
> On 05.09.2018 02:38, 
> n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>>
>  wrote:
> 
> 
> 
> On Sep 4, 2018, at 4:20 PM, Jan Filipiak 
> <jan.filip...@trivago.com<mailto:jan.filip...@trivago.com><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>>>
>  wrote:
> 
> 
> 
> what I meant is litterally this interface:
> 
> 
> 
> https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
>  
> <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>
> 
> 
> Hi Jan,
> 
> 
> 
> Thanks for the reply and I have a few questions.  This Samza doc
> 
> 
> 
>    https://samza.apache.org/learn/documentation/0.14/container/streams.html 
> <https://samza.apache.org/learn/documentation/0.14/container/streams.html>
> 
> 
> 
> indicates that the chooser is set via configuration.  Are you suggesting 
> adding a new configuration for Kafka ?  Seems like we could also have a 
> method on KafkaConsumer
> 
> 
> 
>      public void register(MessageChooser messageChooser)
> 
> 
> I don't have strong opinions regarding this. I like configs, i also
> 
> 
> don't think it would be a problem to have both.
> 
> 
> 
> 
> to make it more dynamic.
> 
> 
> 
> Also, the Samza MessageChooser interface has method
> 
> 
> 
>    /* Notify the chooser that a new envelope is available for a processing. */
> 
> 
> void update(IncomingMessageEnvelope envelope)
> 
> 
> 
> and I’m wondering how this method would be translated to Kafka API.  In 
> particular what corresponds to IncomingMessageEnvelope.
> 
> 
> I think Samza uses the envelop abstraction as they support other sources
> 
> 
> besides kafka aswell. They are more
> 
> 
> on the spark end of things when it comes to different input types. I
> 
> 
> don't have strong opinions but it feels like
> 
> 
> we wouldn't need such a thing in the kafka consumer but just use a
> 
> 
> regular ConsumerRecord or so.
> 
> 
> 
> Best,
> 
> 
> --
> 
> 
>        Nick
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribution of this email (or any attachments) by others is prohibited. If 
> you are not the intended recipient, please contact the sender immediately and 
> permanently delete this email and any attachments. No employee or agent of 
> TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo 
> Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed 
> written agreement.
> 
> 
> 
> 
> ________________________________
> 
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribution of this email (or any attachments) by others is prohibited. If 
> you are not the intended recipient, please contact the sender immediately and 
> permanently delete this email and any attachments. No employee or agent of 
> TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo 
> Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed 
> written agreement.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to