Hi Kamal Thanks for the bump. I have been thinking about this passively for the past few days.
The simplest solution is to store a state at segment level metadata. The state should specify whether the trx index is empty or not. It would be populated during segment archival. We would then iterate over the metadata for future segments without having to make a remote call to download the trx index itself. The other solution for storing state at a partition level wouldn't work, as you mentioned, because we will have to change the state on every mutation to the log i.e. at expiration of segments and append. I have been thinking whether we can do something better than the simple solution, hence the delay in replying. Let me tell you my half baked train of thoughts, perhaps, you can explore this as well. I have been thinking about using LSO (last stable offset) to handle the case when the partition never had any transactions. For a partition which never had any transaction, I would assume that the LSO is never initialized (or is equal to log start offset)? Or is it equal to HW in that case? This is something that I am yet to verify. If this idea works, then we would not have to iterate through the metadata for the dominant case where the partition had no transactions at all. -- Divij Vaidya On Tue, Jun 25, 2024 at 11:42 AM Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > Bump. Please review this proposal. > > > On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash < > kamal.chandraprak...@gmail.com> wrote: > > > Divij, > > > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4 review > > comments. > > > > > 4. Potential alternative - Instead of having an algorithm where we > > traverse > > across segment metadata and looking for isTxnIdxEmpty flag, should we > > directly introduce a nextSegmentWithTrxInx() function? This would allow > > implementers to optimize the otherwise linear scan across metadata for > all > > segments by using techniques such as skip list etc. > > > > This is a good point to optimize the scan. We need to maintain the > > skip-list > > for each leader-epoch. With unclean leader election, some brokers may not > > have > > the complete lineage. This will expand the scope of the work. > > > > In this version, we plan to optimize only for the below 2 cases: > > > > 1. A partition does not have the transaction index for any of the > uploaded > > segments. > > The individual log segments `isTxnIdxEmpty` flag can be reduced to a > > single flag > > in RLMM (using AND operator) that can serve the query - "Is all the > > transaction indexes empty for a partition?". > > If yes, then we can directly scan the local-log for aborted > > transactions. > > 2. A partition is produced using the transactional producer. The > > assumption made is that > > the transaction will either commit/rollback within 15 minutes > > (default transaction.max.timeout.ms = 15 mins), possibly we may have > > to search only > > a few consecutive remote log segments to collect the aborted > > transactions. > > 3. A partition is being produced with both normal and transactional > > producers. In this case, > > we will be doing linear traversal. Maintaining a skip-list might > > improve the performance but > > we delegate the RLMM implementation to users. If implemented > > incorrectly, then it can lead > > to delivery of the aborted transaction records to the consumer. > > > > I notice two drawbacks with the reduction method as proposed in the KIP: > > > > 1. Even if one segment has a transaction index, then we have to iterate > > over all the metadata events. > > 2. Assume that there are 10 segments and segment-5 has a txn index. Once > > the first 6 segments are deleted, > > due to breach by time/size/start-offset, then we should return `true` > > for "Is all the transaction indexes empty for a partition?" > > query but it will return `false` until the broker gets restarted and > we > > have to resort to iterate over all the metadata events. > > > > > 5. Potential alternative#2 - We know that we may want the indexes of > > multiple higher segments. Instead of fetching them sequentially, we could > > implement a parallel fetch or a pre-fetch for the indexes. This would > help > > hide the latency of sequentially fetching the trx indexes. > > > > We can implement parallel-fetch/prefetch once the tiered storage is GAed. > > Since this feature will be useful > > to prefetch the next remote log segment and it expands the scope of the > > work. > > > > > 6. Should the proposed API take "segmentId" as a parameter instead of > > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of > a > > partition, instead it's a property of a specific segment. > > > > We propose to use the `topicIdPartition` in RemoteLogMetadataManager. > > The implementation can fold/reduce the value of the individual log > segment > > `isTxnIdEmpty` flag. This is added to avoid scanning all the metadata > > events > > when the partition does not have a transaction index in any of the > > segments. > > > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <divijvaidy...@gmail.com> > > wrote: > > > >> Hi Kamal > >> > >> Thanks for bringing this up. This is a problem worth solving. We have > >> faced > >> this in situations where some Kafka clients default to read_committed > mode > >> and end up having high latencies for remote fetches due to this > traversal > >> across all segments. > >> > >> First some nits to clarify the KIP: > >> 1. The motivation should make it clear that traversal of all segments is > >> only in the worst case. If I am not mistaken (please correct me if > wrong), > >> the traversal stops when it has found a segment containing LSO. > >> 2. There is nothing like a non-txn topic. A transaction may be started > on > >> any topic. Perhaps, rephrase the statement in the KIP so that it is > clear > >> to the reader. > >> 3. The hyperlink in the "the broker has to traverse all the..." seems > >> incorrect. Did you want to point to > >> > >> > https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 > >> ? > >> 4. In the testing section, could we add a test plan? For example, I > would > >> list down adding a test which would verify the number of calls made to > >> RLMM. This test would have a higher number of calls earlier vs. after > this > >> KIP. > >> > >> Other thoughts: > >> 4. Potential alternative - Instead of having an algorithm where we > >> traverse > >> across segment metadata and looking for isTxnIdxEmpty flag, should we > >> directly introduce a nextSegmentWithTrxInx() function? This would allow > >> implementers to optimize the otherwise linear scan across metadata for > all > >> segments by using techniques such as skip list etc. > >> 5. Potential alternative#2 - We know that we may want the indexes of > >> multiple higher segments. Instead of fetching them sequentially, we > could > >> implement a parallel fetch or a pre-fetch for the indexes. This would > help > >> hide the latency of sequentially fetching the trx indexes. > >> 6. Should the proposed API take "segmentId" as a parameter instead of > >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property > of a > >> partition, instead it's a property of a specific segment. > >> > >> Looking forward to hearing your thoughts about the alternatives. Let's > get > >> this fixed. > >> > >> -- > >> Divij Vaidya > >> > >> > >> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash < > >> kamal.chandraprak...@gmail.com> wrote: > >> > >> > Hi all, > >> > > >> > I have opened a KIP-1058 > >> > < > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > >> > > > >> > to reduce the pressure on remote storage when transactional consumers > >> are > >> > reading non-txn topics from remote storage. > >> > > >> > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic > >> > > >> > Feedbacks and suggestions are welcome. > >> > > >> > Thanks, > >> > Kamal > >> > > >> > > >