Jan,

you scenario is quite complex and I am not sure if I understood every
part of it. I try to break it down:

> In my scenario on startup, I want to read all data from a topic (or a subset 
> of its partitions),
> wait until all the old data has been cached and then start processing of a 
> different stream

That is hard to accomplish in general. Kafka Streams internally uses
KafkaConsumer (one instance per StreamThread) and thus, does rely on the
consumer's behavior with regard to poll(). Hence, Streams cannot control
in detail, what data will be fetched from the brokers.

Furthermore, Streams follow its own internal strategy to pick a record
(from the available ones returned from poll()) and you cannot control in
your code (at least not directly) what record will be picked.

Basically, Streams tried to process records in "timestamp order", ie,
based an the timestamp returned from TimestampExtractor. So you can
"influence" the processing order by record timestamps (as far as you can
influence them) and/or by providing a custom TimestampExtractor.

In your example, you might want the records you want to process first
(KTable), to have smaller timestamps (ie, be earlier) than the records
from your KStream. But even this will only give you "best effort"
behavior, and it can happen that a KStream record is processed before
all KTable records to processed. It's a know issues but hard to resolve.

> when the specific partition doesn't get any message within the retention 
> period,
> then I end up stuck trying to prefetch data to the "KTable" - this is because 
> I get
> the offset of the last message (plus 1) from the broker, but I don't get any 
> data
> ever (until I send a message to the partition)

Cannot follow here: if there is no data, than you can of course not
process any data -- so why do you end up being stuck?

> The problem I see here is that kafka tells me what the last offset in a 
> partition is,
> but there is no upper bound on when a first message will arrive,

In general, the latency between data append at the broker and data
receive at a consumer is rather small. So even if there is strictly no
upper bound until a message gets delivered, this should not be an issue
in practice. Or do I miss understand something?

> even though I reset the offset and start reading from the beginning of a 
> partition.

How does this relate? Cannot follow.

> My question is, is it a possibility not to clear the whole partition, but to 
> always keep at least the last message?

Not with regular retention policy -- not sure if log compaction can help
here.

> That way, the client would always get at least the last message, can 
> therefore figure out
> it is at the end of the partition (reading the old data) and start processing.

Why is this required? If the client's offset is the same as "endOfLog"
for each partition, you can figure out that there is nothing to read. So
why would you need the last old message to figure this out?


-Matthias



On 2/7/17 3:46 AM, Jan Lukavský wrote:
> Hi all,
> 
> I have a question how to do a correct caching in KTable-like structure
> on application startup. I'm not sure if this belongs to user or dev
> maillist, so sorry if I've chosen the bad one. What is my observation so
> far:
> 
>  - if I don't send any data to a kafka partition for a period longer
> then the data retention interval, then all data from the partition is
> wiped out
> 
>  - the index file is not cleared (which is obvious, it has to keep track
> of the next offset to assign to a new message)
> 
> In my scenario on startup, I want to read all data from a topic (or a
> subset of its partitions), wait until all the old data has been cached
> and then start processing of a different stream (basically I'm doing a
> join of KStream and KTable, but I have implemented it manually due to
> some special behavior). Now, what is the issue here - when the specific
> partition doesn't get any message within the retention period, then I
> end up stuck trying to prefetch data to the "KTable" - this is because I
> get the offset of the last message (plus 1) from the broker, but I don't
> get any data ever (until I send a message to the partition). The problem
> I see here is that kafka tells me what the last offset in a partition
> is, but there is no upper bound on when a first message will arrive,
> even though I reset the offset and start reading from the beginning of a
> partition. My question is, is it a possibility not to clear the whole
> partition, but to always keep at least the last message? That way, the
> client would always get at least the last message, can therefore figure
> out it is at the end of the partition (reading the old data) and start
> processing. I believe that KTable implementation could have a very
> similar issue. Or is there any other way around? I could add a timeout,
> but this seems a little fragile.
> 
> Thanks in advance for any suggestions and opinions,
> 
>  Jan
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to