Hi Matthias,
I'm familiar with how the timestamp synchronization currently works. I also 
submit that it does not work for our use-case, which is the following: The 
table-backing topic contains records with the best available data we have for a 
given item. IF a record in this topic is updated, we would always prefer to 
join using this data *regardless* of whether it is "newer" than the incoming 
event we are trying to join it with.

Essentially, streams assumes that we must want the table data that was current 
at the time the event was produced, and here we simply don't. If we have newer 
data, we want that. But my larger concern here is actually reprocessing; when 
doing that the older table-data will be log compacted away and the current 
timestamp semantics will result in events that occurred prior to the latest 
table updates being unjoined. Does this make sense now?

Thanks!
Tommy

On Mon, 2018-09-17 at 09:51 -0700, Matthias J. Sax wrote:

I am not sure if this feature would help with stream-table joins. Also

note, that we recently merged a PR that improves the timestamp

synchronization of Kafka Streams -- this will vastly improve the guarantees.


What I don't understand:


So table records that have been updated recently will not be read until the 
stream records reach or exceed that same timestamp.


Yes, this is on purpose / by design.


and if they do it will be with old data


What do you mean by "old data"? By definition, the stream record will

join with a table that contains data up-to the stream record's

timestamp. It does semantically not make sense to advance the table

beyond the stream record's timestamp, because if you do this, you would

semantically join with "future data" what---from my point of view---is

semantically incorrect.


Shameless plug: you might want to read

https://www.confluent.io/blog/streams-tables-two-sides-same-coin




-Matthias


On 9/17/18 8:23 AM, Thomas Becker wrote:

For my part, a major use-case for this feature is stream-table joins. 
Currently, KafkaStreams does the wrong thing in some cases because the only 
message choosing strategy available is timestamp-based. So table records that 
have been updated recently will not be read until the stream records reach or 
exceed that same timestamp. So there is no guarantee these records get joined 
at all, and if they do it will be with old data. I realize we're talking about 
the consumer here and not streams specifically, but as it stands I can't even 
write a non-streams application that does a join but prioritizes table-topic 
records over stream records without using multiple consumers.


On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:


Hi all,



I agree that DISCUSS is more appropriate than VOTE at this point, since I don't 
remember the last discussion coming to a definite conclusion.



I guess my concern is that this will add complexity and memory consumption on 
the server side.  In the case of incremental fetch requests, we will have to 
track at least two extra bytes per partition, to know what the priority of each 
partition is within each active fetch session.



It would be nice to hear more about the use-cases for this feature.  I think 
Gwen asked about this earlier, and I don't remember reading a response.  The 
fact that we're now talking about Samza interfaces is a bit of a red flag.  
After all, Samza didn't need partition priorities to do what it did.  You can 
do a lot with muting partitions and using appropriate threading in your code.



For example, you can hand data from a partition off to a work queue with a 
fixed size, which is handled by a separate service thread.  If the queue gets 
full, you can mute the partition until some of the buffered data is processed.  
Kafka Streams uses a similar approach to avoid reading partition data that 
isn't immediately needed.



There might be some use-cases that need priorities eventually, but I'm 
concerned that we're jumping the gun by trying to implement this before we know 
what they are.



best,


Colin




On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:



On 05.09.2018 02:38, 
n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>>
 wrote:



On Sep 4, 2018, at 4:20 PM, Jan Filipiak 
<jan.filip...@trivago.com<mailto:jan.filip...@trivago.com><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>>>
 wrote:



what I meant is litterally this interface:



https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
 
<https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>


Hi Jan,



Thanks for the reply and I have a few questions.  This Samza doc



   https://samza.apache.org/learn/documentation/0.14/container/streams.html 
<https://samza.apache.org/learn/documentation/0.14/container/streams.html>



indicates that the chooser is set via configuration.  Are you suggesting adding 
a new configuration for Kafka ?  Seems like we could also have a method on 
KafkaConsumer



     public void register(MessageChooser messageChooser)


I don't have strong opinions regarding this. I like configs, i also


don't think it would be a problem to have both.




to make it more dynamic.



Also, the Samza MessageChooser interface has method



   /* Notify the chooser that a new envelope is available for a processing. */


void update(IncomingMessageEnvelope envelope)



and I’m wondering how this method would be translated to Kafka API.  In 
particular what corresponds to IncomingMessageEnvelope.


I think Samza uses the envelop abstraction as they support other sources


besides kafka aswell. They are more


on the spark end of things when it comes to different input types. I


don't have strong opinions but it feels like


we wouldn't need such a thing in the kafka consumer but just use a


regular ConsumerRecord or so.



Best,


--


       Nick







________________________________


This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.




________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to