My 2 cents - I agree with Colin. I think that it's important that the metadata not grow unbounded without being delegated to external storage. Indefinite long-term storage of entity data in Kafka can result in extremely large datasets where the vast majority of data is stored in the external tier. I would be very disappointed to have the metadata storage be a limiting factor to exactly how much data I can store in Kafka. Additionally, and for example, I think it's very reasonable that an AWS metadata store could be implemented with DynamoDB (key-value store) paired with S3 - faster random-access metadata lookup than plain S3, but without needing to rebuild rocksDB state locally.
On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe <cmcc...@apache.org> wrote: > Hi all, > > Thanks for the KIP. > > I took a look and one thing that stood out to me is that the more metadata > we have, the more storage we will need on local disk for the rocksDB > database. This seems like it contradicts some of the goals of the > project. Ideally the space we need on local disk should be related only to > the size of the hot set, not the size of the cold set. It also seems like > it could lead to extremely long rocksdb rebuild times if we somehow lose a > broker's local storage and have to rebuild it. > > Instead, I think it would be more reasonable to store cold metadata in the > "remote" storage (HDFS, s3, etc.). Not only does this free up space on the > local and avoid long rebuild times, but it also gives us more control over > the management of our cache. With rocksDB we are delegating cache > management to an external library that doesn't really understand our > use-case. > > To give a concrete example of how this is bad, imagine that we have 10 > worker threads and we get 10 requests for something that requires us to > fetch cold tiered storage metadata. Now every worker thread is blocked > inside rocksDB and the broker can do nothing until it finishes fetching > from disk. When accessing a remote service like HDFS or S3, in contrast, > we would be able to check if the data was in our local cache first. If it > wasn't, we could put the request in a purgatory and activate a background > thread to fetch the needed data, and then release the worker thread to be > used by some other request. Having control of our own caching strategy > increases observability, maintainability, and performance. > > I can anticipate a possible counter-argument here: the size of the > metadata should be small and usually fully resident in memory anyway. > While this is true today, I don't think it will always be true. The > current low limit of a few thousand partitions is not competitive in the > long term and needs to be lifted. We'd like to get to at least a million > partitions with KIP-500, and much more later. Also, when you give people > the ability to have unlimited retention, they will want to make use of it. > That means lots of historical log segments to track. This scenario is by > no means hypothetical. Even with the current software, it's easy to think > of cases where someone misconfigured the log segment roll settings and > overwhelmed the system with segments. So overall, I like to understand why > we want to store metadata on local disk rather than remote, and what the > options are for the future. > > best, > Colin > > > On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote: > > Hi Jun, > > Thanks for the replies and feedback on design and giving input. > > We are coming close to finish the implementation. > > We also did several perf tests as well at our peak production loads and > > with tiered storage we didn't see any degradation on write throughputs > and > > latencies. > > Ying already added some of the perf tests results in the KIP itself. > > It will be great if we can get design and code reviews from you > > and others in the community as we make progress. > > Thanks, > > Harsha > > > > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Ying, > > > > > > Thanks for the update. It's good to see the progress on this. Please > let > > > us know when you are done updating the KIP wiki. > > > > > > Jun > > > > > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng <yi...@uber.com.invalid> > wrote: > > > > > >> Hi Jun, > > >> > > >> Satish and I have added more design details in the KIP, including how > to > > >> keep consistency between replicas (especially when there is leadership > > >> changes / log truncations) and new metrics. We also made some other > minor > > >> changes in the doc. We will finish the KIP changes in the next couple > of > > >> days. We will let you know when we are done. Most of the changes are > > >> already updated to the wiki KIP. You can take a look. But it's not the > > >> final version yet. > > >> > > >> As for the implementation, the code is mostly done and we already had > some > > >> feature tests / system tests. I have added the performance test > results in > > >> the KIP. However the recent design changes (e.g. leader epoch info > > >> management / log truncation / some of the new metrics) have not been > > >> implemented yet. It will take about 2 weeks for us to implement after > you > > >> review and agree with those design changes. > > >> > > >> > > >> > > >> On Tue, Jul 7, 2020 at 9:23 AM Jun Rao <j...@confluent.io> wrote: > > >> > > >> > Hi, Satish, Harsha, > > >> > > > >> > Any new updates on the KIP? This feature is one of the most > important > > >> and > > >> > most requested features in Apache Kafka right now. It would be > helpful > > >> if > > >> > we can make sustained progress on this. Could you share how far > along is > > >> > the design/implementation right now? Is there anything that other > people > > >> > can help to get it across the line? > > >> > > > >> > As for "transactional support" and "follower requests/replication", > no > > >> > further comments from me as long as the producer state and leader > epoch > > >> can > > >> > be restored properly from the object store when needed. > > >> > > > >> > Thanks, > > >> > > > >> > Jun > > >> > > > >> > On Tue, Jun 9, 2020 at 3:39 AM Satish Duggana < > satish.dugg...@gmail.com > > >> > > > >> > wrote: > > >> > > > >> > > We did not want to add many implementation details in the KIP. > But we > > >> > > decided to add them in the KIP as appendix or > sub-sections(including > > >> > > follower fetch protocol) to describe the flow with the main cases. > > >> > > That will answer most of the queries. I will update on this mail > > >> > > thread when the respective sections are updated. > > >> > > > > >> > > Thanks, > > >> > > Satish. > > >> > > > > >> > > On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez > > >> > > <alexandre.dupr...@gmail.com> wrote: > > >> > > > > > >> > > > Hi Satish, > > >> > > > > > >> > > > A couple of questions specific to the section "Follower > > >> > > > Requests/Replication", pages 16:17 in the design document [1]. > > >> > > > > > >> > > > 900. It is mentioned that followers fetch auxiliary states from > the > > >> > > > remote storage. > > >> > > > > > >> > > > 900.a Does the consistency model of the external storage impacts > > >> reads > > >> > > > of leader epochs and other auxiliary data? > > >> > > > > > >> > > > 900.b What are the benefits of using a mechanism to store and > access > > >> > > > the leader epochs which is different from other metadata > associated > > >> to > > >> > > > tiered segments? What are the benefits of retrieving this > > >> information > > >> > > > on-demand from the follower rather than relying on propagation > via > > >> the > > >> > > > topic __remote_log_metadata? What are the advantages over using > a > > >> > > > dedicated control structure (e.g. a new record type) propagated > via > > >> > > > this topic? Since in the document, different control paths are > > >> > > > operating in the system, how are the metadata stored in > > >> > > > __remote_log_metadata [which also include the epoch of the > leader > > >> > > > which offloaded a segment] and the remote auxiliary states, > kept in > > >> > > > sync? > > >> > > > > > >> > > > 900.c A follower can encounter an > OFFSET_MOVED_TO_TIERED_STORAGE. Is > > >> > > > this in response to a Fetch or OffsetForLeaderEpoch request? > > >> > > > > > >> > > > 900.d What happens if, after a follower encountered an > > >> > > > OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to > retrieve > > >> > > > leader epochs fail (for instance, because the remote storage is > > >> > > > temporarily unavailable)? Does the follower fallbacks to a mode > > >> where > > >> > > > it ignores tiered segments, and applies truncation using only > > >> locally > > >> > > > available information? What happens when access to the remote > > >> storage > > >> > > > is restored? How is the replica lineage inferred by the remote > > >> leader > > >> > > > epochs reconciled with the follower's replica lineage, which has > > >> > > > evolved? Does the follower remember fetching auxiliary states > failed > > >> > > > in the past and attempt reconciliation? Is there a plan to offer > > >> > > > different strategies in this scenario, configurable via > > >> configuration? > > >> > > > > > >> > > > 900.e Is the leader epoch cache offloaded with every segment? Or > > >> when > > >> > > > a new checkpoint is detected? If that information is not always > > >> > > > offloaded to avoid duplicating data, how does the remote storage > > >> > > > satisfy the request to retrieve it? > > >> > > > > > >> > > > 900.f Since the leader epoch cache covers the entire replica > > >> lineage, > > >> > > > what happens if, after a leader epoch cache file is offloaded > with a > > >> > > > given segment, the local epoch cache is truncated [not > necessarily > > >> for > > >> > > > a range of offset included in tiered segments]? How are remote > and > > >> > > > local leader epoch caches kept consistent? > > >> > > > > > >> > > > 900.g Consumer can also use leader epochs (e.g. to enable > fencing to > > >> > > > protect against stale leaders). What differences would there be > > >> > > > between consumer and follower fetches? Especially, would > consumers > > >> > > > also fetch leader epoch information from the remote storage? > > >> > > > > > >> > > > 900.h Assume a newly elected leader of a topic-partition detects > > >> more > > >> > > > recent segments are available in the external storage, with > epochs > > > >> > > > its local epoch. Does it ignore these segments and their > associated > > >> > > > epoch-to-offset vectors? Or try to reconstruct its local replica > > >> > > > lineage based on the data remotely available? > > >> > > > > > >> > > > Thanks, > > >> > > > Alexandre > > >> > > > > > >> > > > [1] > > >> > > > > >> > > > >> > https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing > > >> > > > > > >> > > > Le jeu. 4 juin 2020 à 19:55, Satish Duggana < > > >> satish.dugg...@gmail.com> > > >> > > a écrit : > > >> > > > > > > >> > > > > Hi Jun, > > >> > > > > Please let us know if you have any comments on "transactional > > >> > support" > > >> > > > > and "follower requests/replication" mentioned in the wiki. > > >> > > > > > > >> > > > > Thanks, > > >> > > > > Satish. > > >> > > > > > > >> > > > > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana < > > >> > > satish.dugg...@gmail.com> wrote: > > >> > > > > > > > >> > > > > > Thanks Jun for your comments. > > >> > > > > > > > >> > > > > > >100. It would be useful to provide more details on how > those > > >> apis > > >> > > are used. Otherwise, it's kind of hard to really assess whether > the > > >> new > > >> > > apis are sufficient/redundant. A few examples below. > > >> > > > > > > > >> > > > > > We will update the wiki and let you know. > > >> > > > > > > > >> > > > > > >100.1 deleteRecords seems to only advance the > logStartOffset in > > >> > > Log. How does that trigger the deletion of remote log segments? > > >> > > > > > > > >> > > > > > RLMTask for leader partition periodically checks whether > there > > >> are > > >> > > > > > remote log segments earlier to logStartOffset and the > respective > > >> > > > > > remote log segment metadata and data are deleted by using > RLMM > > >> and > > >> > > > > > RSM. > > >> > > > > > > > >> > > > > > >100.2 stopReplica with deletion is used in 2 cases (a) > replica > > >> > > reassignment; (b) topic deletion. We only want to delete the > tiered > > >> > > metadata in the second case. Also, in the second case, who > initiates > > >> the > > >> > > deletion of the remote segment since the leader may not exist? > > >> > > > > > > > >> > > > > > Right, it is deleted only incase of topic deletion only. We > will > > >> > > cover > > >> > > > > > the details in the KIP. > > >> > > > > > > > >> > > > > > >100.3 "LogStartOffset of a topic can be either in local or > in > > >> > > remote storage." If LogStartOffset exists in both places, which > one is > > >> > the > > >> > > source of truth? > > >> > > > > > > > >> > > > > > I meant the logStartOffset can point to either of local > segment > > >> or > > >> > > > > > remote segment but it is initialised and maintained in the > Log > > >> > class > > >> > > > > > like now. > > >> > > > > > > > >> > > > > > >100.4 List<RemoteLogSegmentMetadata> > > >> > > listRemoteLogSegments(TopicPartition topicPartition, long > minOffset): > > >> How > > >> > > is minOffset supposed to be used? > > >> > > > > > > > >> > > > > > Returns list of remote segments, sorted by baseOffset in > > >> ascending > > >> > > > > > order that have baseOffset >= the given min Offset. > > >> > > > > > > > >> > > > > > >100.5 When copying a segment to remote storage, it seems > we are > > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and > after > > >> > > copyLogSegment(). Could you explain why? > > >> > > > > > > > >> > > > > > This is more about prepare/commit/rollback as you > suggested. We > > >> > will > > >> > > > > > update the wiki with the new APIs. > > >> > > > > > > > >> > > > > > >100.6 LogSegmentData includes leaderEpochCache, but there > is no > > >> > api > > >> > > in RemoteStorageManager to retrieve it. > > >> > > > > > > > >> > > > > > Nice catch, copy/paste issue. There is an API to retrieve > it. > > >> > > > > > > > >> > > > > > >101. If the __remote_log_metadata is for production usage, > > >> could > > >> > > you provide more details? For example, what is the schema of the > data > > >> > (both > > >> > > key and value)? How is the topic maintained,delete or compact? > > >> > > > > > > > >> > > > > > It is with delete config and it’s retention period is > suggested > > >> to > > >> > be > > >> > > > > > more than the remote retention period. > > >> > > > > > > > >> > > > > > >110. Is the cache implementation in > RemoteLogMetadataManager > > >> meant > > >> > > for production usage? If so, could you provide more details on the > > >> schema > > >> > > and how/where the data is stored? > > >> > > > > > > > >> > > > > > The proposal is to have a cache (with default implementation > > >> backed > > >> > > by > > >> > > > > > rocksdb) but it will be added in later versions. We will add > > >> this > > >> > to > > >> > > > > > future work items. > > >> > > > > > > > >> > > > > > >111. "Committed offsets can be stored in a local file". > Could > > >> you > > >> > > describe the format of the file and where it's stored? > > >> > > > > > > > >> > > > > > We will cover this in the KIP. > > >> > > > > > > > >> > > > > > >112. Truncation of remote segments under unclean leader > > >> election: > > >> > I > > >> > > am not sure who figures out the truncated remote segments and how > that > > >> > > information is propagated to all replicas? > > >> > > > > > > > >> > > > > > We will add this in detail in the KIP. > > >> > > > > > > > >> > > > > > >113. "If there are any failures in removing remote log > segments > > >> > > then those are stored in a specific topic (default as > > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet > another > > >> > > internal topic? Could we just keep retrying? > > >> > > > > > > > >> > > > > > This is not really an internal topic, it will be exposed as > a > > >> user > > >> > > > > > configurable topic. After a few retries, we want user to > know > > >> about > > >> > > > > > the failure so that they can take an action later by > consuming > > >> from > > >> > > > > > this topic. We want to keep this simple instead of retrying > > >> > > > > > continuously and maintaining the deletion state etc. > > >> > > > > > > > >> > > > > > >114. "We may not need to copy producer-id-snapshot as we > are > > >> > > copying only segments earlier to last-stable-offset." Hmm, not > sure > > >> about > > >> > > that. The producer snapshot includes things like the last > timestamp of > > >> > each > > >> > > open producer id and can affect when those producer ids are > expired. > > >> > > > > > > > >> > > > > > Sure, this will be added as part of the LogSegmentData. > > >> > > > > > > > >> > > > > > Thanks, > > >> > > > > > Satish. > > >> > > > > > > > >> > > > > > > > >> > > > > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io> > > >> wrote: > > >> > > > > > > > > >> > > > > > > Hi, Satish, > > >> > > > > > > > > >> > > > > > > Made another pass on the wiki. A few more comments below. > > >> > > > > > > > > >> > > > > > > 100. It would be useful to provide more details on how > those > > >> apis > > >> > > are used. Otherwise, it's kind of hard to really assess whether > the > > >> new > > >> > > apis are sufficient/redundant. A few examples below. > > >> > > > > > > 100.1 deleteRecords seems to only advance the > logStartOffset > > >> in > > >> > > Log. How does that trigger the deletion of remote log segments? > > >> > > > > > > 100.2 stopReplica with deletion is used in 2 cases (a) > replica > > >> > > reassignment; (b) topic deletion. We only want to delete the > tiered > > >> > > metadata in the second case. Also, in the second case, who > initiates > > >> the > > >> > > deletion of the remote segment since the leader may not exist? > > >> > > > > > > 100.3 "LogStartOffset of a topic can be either in local > or in > > >> > > remote storage." If LogStartOffset exists in both places, which > one is > > >> > the > > >> > > source of truth? > > >> > > > > > > 100.4 List<RemoteLogSegmentMetadata> > > >> > > listRemoteLogSegments(TopicPartition topicPartition, long > minOffset): > > >> How > > >> > > is minOffset supposed to be used? > > >> > > > > > > 100.5 When copying a segment to remote storage, it seems > we > > >> are > > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and > after > > >> > > copyLogSegment(). Could you explain why? > > >> > > > > > > 100.6 LogSegmentData includes leaderEpochCache, but there > is > > >> no > > >> > > api in RemoteStorageManager to retrieve it. > > >> > > > > > > > > >> > > > > > > 101. If the __remote_log_metadata is for production usage, > > >> could > > >> > > you provide more details? For example, what is the schema of the > data > > >> > (both > > >> > > key and value)? How is the topic maintained,delete or compact? > > >> > > > > > > > > >> > > > > > > 110. Is the cache implementation in > RemoteLogMetadataManager > > >> > meant > > >> > > for production usage? If so, could you provide more details on the > > >> schema > > >> > > and how/where the data is stored? > > >> > > > > > > > > >> > > > > > > 111. "Committed offsets can be stored in a local file". > Could > > >> you > > >> > > describe the format of the file and where it's stored? > > >> > > > > > > > > >> > > > > > > 112. Truncation of remote segments under unclean leader > > >> election: > > >> > > I am not sure who figures out the truncated remote segments and > how > > >> that > > >> > > information is propagated to all replicas? > > >> > > > > > > > > >> > > > > > > 113. "If there are any failures in removing remote log > > >> segments > > >> > > then those are stored in a specific topic (default as > > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet > another > > >> > > internal topic? Could we just keep retrying? > > >> > > > > > > > > >> > > > > > > 114. "We may not need to copy producer-id-snapshot as we > are > > >> > > copying only segments earlier to last-stable-offset." Hmm, not > sure > > >> about > > >> > > that. The producer snapshot includes things like the last > timestamp of > > >> > each > > >> > > open producer id and can affect when those producer ids are > expired. > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > > > >> > > > > > > Jun > > >> > > > > > > > > >> > > > > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana < > > >> > > satish.dugg...@gmail.com> wrote: > > >> > > > > > >> > > >> > > > > > >> Hi Jun, > > >> > > > > > >> Gentle reminder. Please go through the updated wiki and > let > > >> us > > >> > > know your comments. > > >> > > > > > >> > > >> > > > > > >> Thanks, > > >> > > > > > >> Satish. > > >> > > > > > >> > > >> > > > > > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana < > > >> > > satish.dugg...@gmail.com> wrote: > > >> > > > > > >>> > > >> > > > > > >>> Hi Jun, > > >> > > > > > >>> Please go through the wiki which has the latest updates. > > >> Google > > >> > > doc is updated frequently to be in sync with wiki. > > >> > > > > > >>> > > >> > > > > > >>> Thanks, > > >> > > > > > >>> Satish. > > >> > > > > > >>> > > >> > > > > > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao < > j...@confluent.io> > > >> > > wrote: > > >> > > > > > >>>> > > >> > > > > > >>>> Hi, Satish, > > >> > > > > > >>>> > > >> > > > > > >>>> Thanks for the update. Just to clarify. Which doc has > the > > >> > > latest updates, the wiki or the google doc? > > >> > > > > > >>>> > > >> > > > > > >>>> Jun > > >> > > > > > >>>> > > >> > > > > > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana < > > >> > > satish.dugg...@gmail.com> wrote: > > >> > > > > > >>>>> > > >> > > > > > >>>>> Hi Jun, > > >> > > > > > >>>>> Thanks for your comments. We updated the KIP with > more > > >> > > details. > > >> > > > > > >>>>> > > >> > > > > > >>>>> >100. For each of the operations related to tiering, > it > > >> would > > >> > > be useful to provide a description on how it works with the new > API. > > >> > These > > >> > > include things like consumer fetch, replica fetch, > offsetForTimestamp, > > >> > > retention (remote and local) by size, time and logStartOffset, > topic > > >> > > deletion, etc. This will tell us if the proposed APIs are > sufficient. > > >> > > > > > >>>>> > > >> > > > > > >>>>> We addressed most of these APIs in the KIP. We can add > > >> more > > >> > > details if needed. > > >> > > > > > >>>>> > > >> > > > > > >>>>> >101. For the default implementation based on internal > > >> topic, > > >> > > is it meant as a proof of concept or for production usage? I > assume > > >> that > > >> > > it's the former. However, if it's the latter, then the KIP needs > to > > >> > > describe the design in more detail. > > >> > > > > > >>>>> > > >> > > > > > >>>>> It is production usage as was mentioned in an earlier > > >> mail. > > >> > We > > >> > > plan to update this section in the next few days. > > >> > > > > > >>>>> > > >> > > > > > >>>>> >102. When tiering a segment, the segment is first > > >> written to > > >> > > the object store and then its metadata is written to RLMM using > the > > >> api > > >> > > "void putRemoteLogSegmentData()". One potential issue with this > > >> approach > > >> > is > > >> > > that if the system fails after the first operation, it leaves a > > >> garbage > > >> > in > > >> > > the object store that's never reclaimed. One way to improve this > is to > > >> > have > > >> > > two separate APIs, sth like preparePutRemoteLogSegmentData() and > > >> > > commitPutRemoteLogSegmentData(). > > >> > > > > > >>>>> > > >> > > > > > >>>>> That is a good point. We currently have a different > way > > >> using > > >> > > markers in the segment but your suggestion is much better. > > >> > > > > > >>>>> > > >> > > > > > >>>>> >103. It seems that the transactional support and the > > >> ability > > >> > > to read from follower are missing. > > >> > > > > > >>>>> > > >> > > > > > >>>>> KIP is updated with transactional support, follower > fetch > > >> > > semantics, and reading from a follower. > > >> > > > > > >>>>> > > >> > > > > > >>>>> >104. It would be useful to provide a testing plan for > > >> this > > >> > > KIP. > > >> > > > > > >>>>> > > >> > > > > > >>>>> We added a few tests by introducing test util for > tiered > > >> > > storage in the PR. We will provide the testing plan in the next > few > > >> days. > > >> > > > > > >>>>> > > >> > > > > > >>>>> Thanks, > > >> > > > > > >>>>> Satish. > > >> > > > > > >>>>> > > >> > > > > > >>>>> > > >> > > > > > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani < > > >> > > ka...@harsha.io> wrote: > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao < > > >> j...@confluent.io > > >> > > > > >> > > wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi, Satish, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks for the updated doc. The new API seems to be > an > > >> > > improvement overall. A few more comments below. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 100. For each of the operations related to tiering, > it > > >> > would > > >> > > be useful to provide a description on how it works with the new > API. > > >> > These > > >> > > include things like consumer fetch, replica fetch, > offsetForTimestamp, > > >> > > retention > > >> > > > > > >>>>>>> (remote and local) by size, time and logStartOffset, > > >> topic > > >> > > deletion, etc. This will tell us if the proposed APIs are > sufficient. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> Thanks for the feedback Jun. We will add more details > > >> around > > >> > > this. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 101. For the default implementation based on > internal > > >> > topic, > > >> > > is it meant as a proof of concept or for production usage? I > assume > > >> that > > >> > > it's the former. However, if it's the latter, then the KIP needs > to > > >> > > describe the design in more detail. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> Yes it meant to be for production use. Ideally it > would > > >> be > > >> > > good to merge this in as the default implementation for metadata > > >> service. > > >> > > We can add more details around design and testing. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>>> 102. When tiering a segment, the segment is first > > >> written > > >> > to > > >> > > the object store and then its metadata is written to RLMM using > the > > >> api > > >> > > "void putRemoteLogSegmentData()". > > >> > > > > > >>>>>>> One potential issue with this approach is that if > the > > >> > system > > >> > > fails after the first operation, it leaves a garbage in the object > > >> store > > >> > > that's never reclaimed. One way to improve this is to have two > > >> separate > > >> > > APIs, sth like preparePutRemoteLogSegmentData() and > > >> > > commitPutRemoteLogSegmentData(). > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 103. It seems that the transactional support and the > > >> > ability > > >> > > to read from follower are missing. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 104. It would be useful to provide a testing plan > for > > >> this > > >> > > KIP. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> We are working on adding more details around > > >> transactional > > >> > > support and coming up with test plan. > > >> > > > > > >>>>>> Add system tests and integration tests. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Jun > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana < > > >> > > satish.dugg...@gmail.com> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi Jun, > > >> > > > > > >>>>>>> Please look at the earlier reply and let us know > your > > >> > > comments. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> Satish. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana < > > >> > > satish.dugg...@gmail.com> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi Jun, > > >> > > > > > >>>>>>> Thanks for your comments on the separation of > remote log > > >> > > metadata storage and remote log storage. > > >> > > > > > >>>>>>> We had a few discussions since early Jan on how to > > >> support > > >> > > eventually consistent stores like S3 by uncoupling remote log > segment > > >> > > metadata and remote log storage. It is written with details in > the doc > > >> > > here(1). Below is the brief summary of the discussion from that > doc. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The current approach consists of pulling the remote > log > > >> > > segment metadata from remote log storage APIs. It worked fine for > > >> > storages > > >> > > like HDFS. But one of the problems of relying on the remote > storage to > > >> > > maintain metadata is that tiered-storage needs to be strongly > > >> consistent, > > >> > > with an impact not only on the metadata(e.g. LIST in S3) but also > on > > >> the > > >> > > segment data(e.g. GET after a DELETE in S3). The cost of > maintaining > > >> > > metadata in remote storage needs to be factored in. This is true > in > > >> the > > >> > > case of S3, LIST APIs incur huge costs as you raised earlier. > > >> > > > > > >>>>>>> So, it is good to separate the remote storage from > the > > >> > > remote log metadata store. We refactored the existing > > >> > RemoteStorageManager > > >> > > and introduced RemoteLogMetadataManager. Remote log metadata store > > >> should > > >> > > give strong consistency semantics but remote log storage can be > > >> > eventually > > >> > > consistent. > > >> > > > > > >>>>>>> We can have a default implementation for > > >> > > RemoteLogMetadataManager which uses an internal topic(as > mentioned in > > >> one > > >> > > of our earlier emails) as storage. But users can always plugin > their > > >> own > > >> > > RemoteLogMetadataManager implementation based on their > environment. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Please go through the updated KIP and let us know > your > > >> > > comments. We have started refactoring for the changes mentioned > in the > > >> > KIP > > >> > > and there may be a few more updates to the APIs. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> [1] > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > >> > > > >> > https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7# > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko < > > >> > > ivan0yurche...@gmail.com> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi all, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Jun: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per > 1000 > > >> > > requests. If > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> you > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the > metadata > > >> for > > >> > > each > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> partition > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> at > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is > > >> roughly > > >> > > $40K per > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> day. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> I want to note here, that no reasonably durable > storage > > >> > will > > >> > > be cheap at 100k RPS. For example, DynamoDB might give the same > > >> ballpark > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> figures. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> If we want to keep the pull-based approach, we can > try > > >> to > > >> > > reduce this > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> number > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in several ways: doing listings less frequently (as > > >> Satish > > >> > > mentioned, with the current defaults it's ~3.33k RPS for your > > >> example), > > >> > > batching listing operations in some way (depending on the > storage; it > > >> > might > > >> > > require the change of RSM's interface). > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> There are different ways for doing push based > metadata > > >> > > propagation. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Some > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> object stores may support that already. For > example, S3 > > >> > > supports > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> events > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> notification > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> This sounds interesting. However, I see a couple of > > >> issues > > >> > > using it: > > >> > > > > > >>>>>>> 1. As I understand the documentation, notification > > >> delivery > > >> > > is not guaranteed > > >> > > > > > >>>>>>> and it's recommended to periodically do LIST to > fill the > > >> > > gaps. Which brings us back to the same LIST consistency guarantees > > >> issue. > > >> > > > > > >>>>>>> 2. The same goes for the broker start: to get the > > >> current > > >> > > state, we > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> need > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to LIST. > > >> > > > > > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): > AFAIK > > >> SQS > > >> > > and SNS > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> aren't > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> designed for such a case. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Alexandre: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> A.1 As commented on PR 7561, S3 consistency model > [1][2] > > >> > > implies RSM > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> cannot > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> relies solely on S3 APIs to guarantee the expected > > >> strong > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> consistency. The > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> proposed implementation [3] would need to be > updated to > > >> > take > > >> > > this > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> into > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> account. Let’s talk more about this. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thank you for the feedback. I clearly see the need > for > > >> > > changing the S3 implementation > > >> > > > > > >>>>>>> to provide stronger consistency guarantees. As it > see > > >> from > > >> > > this thread, there are > > >> > > > > > >>>>>>> several possible approaches to this. Let's discuss > > >> > > RemoteLogManager's contract and > > >> > > > > > >>>>>>> behavior (like pull vs push model) further before > > >> picking > > >> > > one (or > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> several - > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> ?) of them. > > >> > > > > > >>>>>>> I'm going to do some evaluation of DynamoDB for the > > >> > > pull-based > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> approach, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> if it's possible to apply it paying a reasonable > bill. > > >> > Also, > > >> > > of the push-based approach > > >> > > > > > >>>>>>> with a Kafka topic as the medium. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> A.2.3 Atomicity – what does an implementation of RSM > > >> need > > >> > to > > >> > > provide > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> with > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> respect to atomicity of the APIs copyLogSegment, > > >> > > cleanupLogUntil and deleteTopicPartition? If a partial failure > > >> happens in > > >> > > any of those > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (e.g. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the S3 implementation, if one of the multiple > uploads > > >> fails > > >> > > [4]), > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The S3 implementation is going to change, but it's > worth > > >> > > clarifying > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> anyway. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The segment log file is being uploaded after S3 has > > >> acked > > >> > > uploading of all other files associated with the segment and only > > >> after > > >> > > this the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> whole > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> segment file set becomes visible remotely for > operations > > >> > > like listRemoteSegments [1]. > > >> > > > > > >>>>>>> In case of upload failure, the files that has been > > >> > > successfully > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> uploaded > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> stays > > >> > > > > > >>>>>>> as invisible garbage that is collected by > > >> cleanupLogUntil > > >> > (or > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> overwritten > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> successfully later). > > >> > > > > > >>>>>>> And the opposite happens during the deletion: log > files > > >> are > > >> > > deleted > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> first. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> This approach should generally work when we solve > > >> > > consistency issues by adding a strongly consistent storage: a > > >> segment's > > >> > > uploaded files > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> remain > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> invisible garbage until some metadata about them is > > >> > written. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> A.3 Caching – storing locally the segments retrieved > > >> from > > >> > > the remote storage is excluded as it does not align with the > original > > >> > intent > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and even > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> defeat some of its purposes (save disk space etc.). > That > > >> > > said, could > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> there > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> be other types of use cases where the pattern of > access > > >> to > > >> > > the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> remotely > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> stored segments would benefit from local caching > (and > > >> > > potentially read-ahead)? Consider the use case of a large pool of > > >> > consumers > > >> > > which > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> start > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> a backfill at the same time for one day worth of > data > > >> from > > >> > > one year > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> ago > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> stored remotely. Caching the segments locally would > > >> allow > > >> > to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> uncouple the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> load on the remote storage from the load on the > Kafka > > >> > > cluster. Maybe > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> RLM could expose a configuration parameter to switch > > >> that > > >> > > feature > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> on/off? > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> I tend to agree here, caching remote segments > locally > > >> and > > >> > > making this configurable sounds pretty practical to me. We should > > >> > implement > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> maybe not in the first iteration. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Br, > > >> > > > > > >>>>>>> Ivan > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> [1] > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > >> > > > >> > https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez < > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> alexandre.dupr...@gmail.com> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi Jun, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thank you for the feedback. I am trying to > understand > > >> how a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> push-based > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> approach would work. > > >> > > > > > >>>>>>> In order for the metadata to be propagated (under > the > > >> > > assumption you stated), would you plan to add a new API in Kafka > to > > >> allow > > >> > > the metadata store to send them directly to the brokers? > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> Alexandre > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao < > j...@confluent.io> > > >> a > > >> > > écrit : > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi, Satish and Ivan, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> There are different ways for doing push based > metadata > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> propagation. Some > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> object stores may support that already. For > example, S3 > > >> > > supports > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> events > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> notification ( > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > >> > https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html). > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Otherwise one could use a separate metadata store > that > > >> > > supports > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> push-based > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> change propagation. Other people have mentioned > using a > > >> > Kafka > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> topic. The > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> best approach may depend on the object store and the > > >> > > operational environment (e.g. whether an external metadata store > is > > >> > already > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> available). > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The above discussion is based on the assumption > that we > > >> > need > > >> > > to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> cache the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> object metadata locally in every broker. I mentioned > > >> > earlier > > >> > > that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> an > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> alternative is to just store/retrieve those > metadata in > > >> an > > >> > > external metadata store. That may simplify the implementation in > some > > >> > cases. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Jun > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana < > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> satish.dugg...@gmail.com> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi Jun, > > >> > > > > > >>>>>>> Thanks for your reply. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Currently, `listRemoteSegments` is called at the > > >> configured > > >> > > interval(not every second, defaults to 30secs). Storing remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> log > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> metadata in a strongly consistent store for S3 RSM > is > > >> > raised > > >> > > in PR-comment[1]. > > >> > > > > > >>>>>>> RLM invokes RSM at regular intervals and RSM can > give > > >> > remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> segment > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> metadata if it is available. RSM is responsible for > > >> > > maintaining > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> fetching those entries. It should be based on > whatever > > >> > > mechanism > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> consistent and efficient with the respective remote > > >> > storage. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Can you give more details about push based mechanism > > >> from > > >> > > RSM? > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 1. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > https://github.com/apache/kafka/pull/7561#discussion_r344576223 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> Satish. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao < > > >> j...@confluent.io> > > >> > > wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi, Harsha, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks for the reply. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 40/41. I am curious which block storages you have > > >> tested. > > >> > S3 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> seems > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to be > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> one of the popular block stores. The concerns that I > > >> have > > >> > > with > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> pull > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> based > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> approach are the following. > > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per > 1000 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> requests. If > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> you > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the > metadata > > >> for > > >> > > each > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> partition > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> at > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is > > >> roughly > > >> > > $40K > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> per > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> day. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (b) Semantics: S3 list objects are eventually > > >> consistent. > > >> > So, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> when > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> you > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> do a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> list object request, there is no guarantee that you > can > > >> see > > >> > > all > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> uploaded > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> objects. This could impact the correctness of > subsequent > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> logics. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (c) Efficiency: Blindly pulling metadata when there > is > > >> no > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> change adds > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> unnecessary overhead in the broker as well as in the > > >> block > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> store. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> So, have you guys tested S3? If so, could you share > your > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> experience > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> terms of cost, semantics and efficiency? > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Jun > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani > < > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> ka...@harsha.io > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi Jun, > > >> > > > > > >>>>>>> Thanks for the reply. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao < > > >> j...@confluent.io > > >> > > > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi, Satish and Ying, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks for the reply. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 40/41. There are two different ways that we can > approach > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> One is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> what > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> you said. We can have an opinionated way of storing > and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> populating > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> tier > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> metadata that we think is good enough for everyone. > I am > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> not > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> sure if > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> is the case based on what's currently proposed in > the > > >> KIP. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> For > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> example, I > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> am not sure that (1) everyone always needs local > > >> metadata; > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (2) > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> current > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> local storage format is general enough and (3) > everyone > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wants to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> use > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> pull based approach to propagate the metadata. > Another > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> approach > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> is to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> make > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this pluggable and let the implementor implements > the > > >> best > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> approach > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> for a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> particular block storage. I haven't seen any > comments > > >> from > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Slack/AirBnb > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the mailing list on this topic. It would be great if > > >> they > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> can > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> provide > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> feedback directly here. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The current interfaces are designed with most > popular > > >> block > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storages > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> available today and we did 2 implementations with > these > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> interfaces and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> they both are yielding good results as we going > through > > >> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> testing of > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> it. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> If there is ever a need for pull based approach we > can > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> definitely > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> evolve > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the interface. > > >> > > > > > >>>>>>> In the past we did mark interfaces to be evolving to > > >> make > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> room for > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> unknowns > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in the future. > > >> > > > > > >>>>>>> If you have any suggestions around the current > > >> interfaces > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> please > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> propose we > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> are happy to see if we can work them into it. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 43. To offer tier storage as a general feature, > ideally > > >> all > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> existing > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> capabilities should still be supported. It's fine > if the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> uber > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> implementation doesn't support all capabilities for > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> internal > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> usage. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> However, the framework should be general enough. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> We agree on that as a principle. But all of these > major > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> features > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> mostly > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> coming right now and to have a new big feature such > as > > >> > tiered > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> support all the new features will be a big ask. We > can > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> document on > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> how > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> do > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> we approach solving these in future iterations. > > >> > > > > > >>>>>>> Our goal is to make this tiered storage feature > work for > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> everyone. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 43.3 This is more than just serving the tier-ed data > > >> from > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> block > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> With KIP-392, the consumer now can resolve the > conflicts > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> with the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> replica > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> based on leader epoch. So, we need to make sure that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> leader epoch > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> can be > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> recovered properly from tier storage. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> We are working on testing our approach and we will > > >> update > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the KIP > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> with > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> design details. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 43.4 For JBOD, if tier storage stores the tier > metadata > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> locally, we > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> need to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> support moving such metadata across disk directories > > >> since > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> JBOD > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> supports > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> moving data across disks. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> KIP is updated with JBOD details. Having said that > JBOD > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> tooling > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> needs > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> evolve to support production loads. Most of the > users > > >> will > > >> > be > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> interested in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> using tiered storage without JBOD support support on > > >> day 1. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> Harsha > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> As for meeting, we could have a KIP e-meeting on > this if > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> needed, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> but it > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> will be open to everyone and will be recorded and > > >> shared. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Often, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> details are still resolved through the mailing list. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Jun > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> <yi...@uber.com.invalid> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Please ignore my previous email > > >> > > > > > >>>>>>> I didn't know Apache requires all the discussions > to be > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> "open" > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng < > > >> yi...@uber.com> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi Jun, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thank you very much for your feedback! > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Can we schedule a meeting in your Palo Alto office > in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> December? I > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> think a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> face to face discussion is much more efficient than > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> emails. Both > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Harsha > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> I can visit you. Satish may be able to join us > remotely. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao < > > >> j...@confluent.io > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Hi, Satish and Harsha, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The following is a more detailed high level > feedback for > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the KIP. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Overall, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the KIP seems useful. The challenge is how to > design it > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> such that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> it’s > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> general enough to support different ways of > implementing > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> feature > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> support existing features. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 40. Local segment metadata storage: The KIP makes > the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> assumption > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> metadata for the archived log segments are cached > > >> locally > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> every > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> broker > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and provides a specific implementation for the local > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> framework. We probably should discuss this more. For > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> example, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> some > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> tier > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage providers may not want to cache the metadata > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> locally and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> just > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> rely > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> upon a remote key/value store if such a store is > already > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> present. If > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> local store is used, there could be different ways > of > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> implementing it > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (e.g., based on customized local files, an embedded > > >> local > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> store > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> like > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> RocksDB, etc). An alternative of designing this is > to > > >> just > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> provide an > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> interface for retrieving the tier segment metadata > and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> leave the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> details > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> of > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> how to get the metadata outside of the framework. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 41. RemoteStorageManager interface and the usage of > the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> interface in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> framework: I am not sure if the interface is general > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> enough. For > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> example, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> it seems that RemoteLogIndexEntry is tied to a > specific > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> way of > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storing > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> metadata in remote storage. The framework uses > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> listRemoteSegments() > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> api > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> a pull based approach. However, in some other > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> implementations, a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> push > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> based > > >> > > > > > >>>>>>> approach may be more preferred. I don’t have a > concrete > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> proposal > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> yet. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> But, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> it would be useful to give this area some more > thoughts > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and see > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> if we > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> can > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> make the interface more general. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 42. In the diagram, the RemoteLogManager is side by > side > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> with > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> LogManager. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> This KIP only discussed how the fetch request is > handled > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> between > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> two > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> layer. However, we should also consider how other > > >> requests > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> touch > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> log can be handled. e.g., list offsets by timestamp, > > >> delete > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> records, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> etc. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Also, in this model, it's not clear which component > is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> responsible > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> for > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> managing the log start offset. It seems that the log > > >> start > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> offset > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> could > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> be > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> changed by both RemoteLogManager and LogManager. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 43. There are quite a few existing features not > covered > > >> by > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> KIP. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> It > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> would be useful to discuss each of those. > > >> > > > > > >>>>>>> 43.1 I won’t say that compacted topics are rarely > used > > >> and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> always > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> small. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> For example, KStreams uses compacted topics for > storing > > >> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> states > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> sometimes the size of the topic could be large. > While it > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> might > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> be ok > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> not > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> support compacted topics initially, it would be > useful > > >> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> have a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> high > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> level > > >> > > > > > >>>>>>> idea on how this might be supported down the road so > > >> that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> we > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> don’t > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> have > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> make incompatible API changes in the future. > > >> > > > > > >>>>>>> 43.2 We need to discuss how EOS is supported. In > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> particular, how > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> producer state integrated with the remote storage. > 43.3 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Now that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> KIP-392 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (allow consumers to fetch from closest replica) is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> implemented, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> we > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> need > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> discuss how reading from a follower replica is > supported > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> with > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> tier > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> 43.4 We need to discuss how JBOD is supported with > tier > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Jun > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley < > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> tbent...@redhat.com > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks for those insights Ying. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> <yi...@uber.com.invalid > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, I missed that point. However, there's still > a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> point at > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> which > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> consumer fetches start getting served from remote > > >> storage > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (even > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> if > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> point isn't as soon as the local log retention > > >> time/size). > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> This > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> represents > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> a kind of performance cliff edge and what I'm really > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> interested > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> in > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> how > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> easy it is for a consumer which falls off that > cliff to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> catch up > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> and so > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> its > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> fetches again come from local storage. Obviously > this > > >> can > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> depend > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> on > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> all > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> sorts of factors (like production rate, consumption > > >> rate), > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> so > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> it's > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> not > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> guaranteed (just like it's not guaranteed for Kafka > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> today), but > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> would > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> represent a new failure mode. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> As I have explained in the last mail, it's a very > rare > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> case that > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> a > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> consumer > > >> > > > > > >>>>>>> need to read remote data. With our experience at > Uber, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> this only > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> happens > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> when the consumer service had an outage for several > > >> hours. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> There is not a "performance cliff" as you assume. > The > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> is > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> even faster than local disks in terms of bandwidth. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Reading from > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage is going to have higher latency than local > disk. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> But > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> since > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> consumer > > >> > > > > > >>>>>>> is catching up several hours data, it's not > sensitive to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> sub-second > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> level > > >> > > > > > >>>>>>> latency, and each remote read request will read a > large > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> amount of > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> data to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> make the overall performance better than reading > from > > >> local > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> disks. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Another aspect I'd like to understand better is the > > >> effect > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> of > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> serving > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> fetch > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> request from remote storage has on the broker's > network > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> utilization. If > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> we're just trimming the amount of data held locally > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (without > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> increasing > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> overall local+remote retention), then we're > effectively > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> trading > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> disk > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> bandwidth for network bandwidth when serving fetch > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> requests from > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage (which I understand to be a good thing, > since > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> brokers are > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> often/usually disk bound). But if we're increasing > the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> overall > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> local+remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> retention then it's more likely that network itself > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> becomes the > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> bottleneck. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> I appreciate this is all rather hand wavy, I'm just > > >> trying > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> understand > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> how this would affect broker performance, so I'd be > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> grateful for > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> any > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> insights you can offer. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Network bandwidth is a function of produce speed, > it has > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> nothing > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> to > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> do > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> with > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> remote retention. As long as the data is shipped to > > >> remote > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> storage, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> you > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> can > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> keep the data there for 1 day or 1 year or 100 > years, it > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> doesn't > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> consume > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> any > > >> > > > > > >>>>>>> network resources. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> > > >> > > > > >> > > > >> > > > > > >