James,

Your pictures do an excellent job of illustrating my point. 

My mention of the additional "10's of minutes to hours" refers to how far after 
the original target checkpoint (T1 in your diagram) on may need to go to get to 
a checkpoint where all partitions of all topics are in the uncompacted region 
of their respective logs. In terms of your diagram: the T3 transaction could 
have been written 10's of minutes to hours after T1 as that was how much time 
it took all readers to get to T1.

> You would not have to start over from the beginning in order to read to T3.

While I agree this is technically true, in practice it could be very onerous to 
actually do it. For example, we use the Kafka consumer that is part of the 
Spark Streaming library to read table topics. It accepts a range of offsets to 
read for each partition. Say we originally target ranges from offset 0 to the 
offset of T1 for each topic+partition. There really is no way to have the 
library arrive at T1 an then "keep going" to T3. What is worse, given Spark's 
design, if you lost a worker during your calculations you would be in a rather 
sticky position. Spark achieves resiliency not by data redundancy but by 
keeping track of how to reproduce the transformations leading to a state. In 
the face of a lost worker, Spark would try to re-read that portion of the data 
on the lost worker from Kafka. However, in the interim compaction may have 
moved past the reproducible checkpoint (T3) rendering the data inconsistent. At 
best the entire calculation would need to start over targeting some later 
transaction checkpoint.

Needless to say with the proposed feature everything is quite simple. As long 
as we set the compaction lag large enough we can be assured that T1 will remain 
in the uncompacted region an thereby be reproducible. Thus reading from 0 to 
the offsets in T1 will be sufficient for the duration of the calculation.

Eric


Reply via email to