My 2 cents -

I agree with Colin. I think that it's important that the metadata not grow
unbounded without being delegated to external storage. Indefinite long-term
storage of entity data in Kafka can result in extremely large datasets
where the vast majority of data is stored in the external tier. I would be
very disappointed to have the metadata storage be a limiting factor to
exactly how much data I can store in Kafka. Additionally, and for example,
I think it's very reasonable that an AWS metadata store could be
implemented with DynamoDB (key-value store) paired with S3 - faster
random-access metadata lookup than plain S3, but without needing to rebuild
rocksDB state locally.



On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe <cmcc...@apache.org> wrote:

> Hi all,
>
> Thanks for the KIP.
>
> I took a look and one thing that stood out to me is that the more metadata
> we have, the more storage we will need on local disk for the rocksDB
> database.  This seems like it contradicts some of the goals of the
> project.  Ideally the space we need on local disk should be related only to
> the size of the hot set, not the size of the cold set.  It also seems like
> it could lead to extremely long rocksdb rebuild times if we somehow lose a
> broker's local storage and have to rebuild it.
>
> Instead, I think it would be more reasonable to store cold metadata in the
> "remote" storage (HDFS, s3, etc.).  Not only does this free up space on the
> local and avoid long rebuild times, but it also gives us more control over
> the management of our cache.  With rocksDB we are delegating cache
> management to an external library that doesn't really understand our
> use-case.
>
> To give a concrete example of how this is bad, imagine that we have 10
> worker threads and we get  10 requests for something that requires us to
> fetch cold tiered storage metadata.  Now every worker thread is blocked
> inside rocksDB and the broker can do nothing until it finishes fetching
> from disk.  When accessing a remote service like HDFS or S3, in contrast,
> we would be able to check if the data was in our local cache first.  If it
> wasn't, we could put the request in a purgatory and activate a background
> thread to fetch the needed data, and then release the worker thread to be
> used by some other request.  Having control of our own caching strategy
> increases observability, maintainability, and performance.
>
> I can anticipate a possible counter-argument here: the size of the
> metadata should be small and usually fully resident in memory anyway.
> While this is true today, I don't think it will always be true.  The
> current low limit of a few thousand partitions is not competitive in the
> long term and needs to be lifted.  We'd like to get to at least a million
> partitions with KIP-500, and much more later.  Also, when you give people
> the ability to have unlimited retention, they will want to make use of it.
> That means lots of historical log segments to track.  This scenario is by
> no means hypothetical.  Even with the current software, it's easy to think
> of cases where someone misconfigured the log segment roll settings and
> overwhelmed the system with segments.  So overall, I like to understand why
> we want to store metadata on local disk rather than remote, and what the
> options are for the future.
>
> best,
> Colin
>
>
> On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote:
> > Hi Jun,
> >           Thanks for the replies and feedback on design and giving input.
> > We are coming close to finish the implementation.
> > We also did several perf tests as well at our peak production loads and
> > with tiered storage we didn't see any degradation on write throughputs
> and
> > latencies.
> > Ying already added some of the perf tests results in the KIP itself.
> >          It will be great if we can get design and code reviews from you
> > and others in the community as we make progress.
> > Thanks,
> > Harsha
> >
> > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Ying,
> > >
> > > Thanks for the update. It's good to see the progress on this. Please
> let
> > > us know when you are done updating the KIP wiki.
> > >
> > > Jun
> > >
> > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng <yi...@uber.com.invalid>
> wrote:
> > >
> > >> Hi Jun,
> > >>
> > >> Satish and I have added more design details in the KIP, including how
> to
> > >> keep consistency between replicas (especially when there is leadership
> > >> changes / log truncations) and new metrics. We also made some other
> minor
> > >> changes in the doc. We will finish the KIP changes in the next couple
> of
> > >> days. We will let you know when we are done. Most of the changes are
> > >> already updated to the wiki KIP. You can take a look. But it's not the
> > >> final version yet.
> > >>
> > >> As for the implementation, the code is mostly done and we already had
> some
> > >> feature tests / system tests. I have added the performance test
> results in
> > >> the KIP. However the recent design changes (e.g. leader epoch info
> > >> management / log truncation / some of the new metrics) have not been
> > >> implemented yet. It will take about 2 weeks for us to implement after
> you
> > >> review and agree with those design changes.
> > >>
> > >>
> > >>
> > >> On Tue, Jul 7, 2020 at 9:23 AM Jun Rao <j...@confluent.io> wrote:
> > >>
> > >> > Hi, Satish, Harsha,
> > >> >
> > >> > Any new updates on the KIP? This feature is one of the most
> important
> > >> and
> > >> > most requested features in Apache Kafka right now. It would be
> helpful
> > >> if
> > >> > we can make sustained progress on this. Could you share how far
> along is
> > >> > the design/implementation right now? Is there anything that other
> people
> > >> > can help to get it across the line?
> > >> >
> > >> > As for "transactional support" and "follower requests/replication",
> no
> > >> > further comments from me as long as the producer state and leader
> epoch
> > >> can
> > >> > be restored properly from the object store when needed.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Jun
> > >> >
> > >> > On Tue, Jun 9, 2020 at 3:39 AM Satish Duggana <
> satish.dugg...@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > We did not want to add many implementation details in the KIP.
> But we
> > >> > > decided to add them in the KIP as appendix or
> sub-sections(including
> > >> > > follower fetch protocol) to describe the flow with the main cases.
> > >> > > That will answer most of the queries. I will update on this mail
> > >> > > thread when the respective sections are updated.
> > >> > >
> > >> > > Thanks,
> > >> > > Satish.
> > >> > >
> > >> > > On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez
> > >> > > <alexandre.dupr...@gmail.com> wrote:
> > >> > > >
> > >> > > > Hi Satish,
> > >> > > >
> > >> > > > A couple of questions specific to the section "Follower
> > >> > > > Requests/Replication", pages 16:17 in the design document [1].
> > >> > > >
> > >> > > > 900. It is mentioned that followers fetch auxiliary states from
> the
> > >> > > > remote storage.
> > >> > > >
> > >> > > > 900.a Does the consistency model of the external storage impacts
> > >> reads
> > >> > > > of leader epochs and other auxiliary data?
> > >> > > >
> > >> > > > 900.b What are the benefits of using a mechanism to store and
> access
> > >> > > > the leader epochs which is different from other metadata
> associated
> > >> to
> > >> > > > tiered segments? What are the benefits of retrieving this
> > >> information
> > >> > > > on-demand from the follower rather than relying on propagation
> via
> > >> the
> > >> > > > topic __remote_log_metadata? What are the advantages over using
> a
> > >> > > > dedicated control structure (e.g. a new record type) propagated
> via
> > >> > > > this topic? Since in the document, different control paths are
> > >> > > > operating in the system, how are the metadata stored in
> > >> > > > __remote_log_metadata [which also include the epoch of the
> leader
> > >> > > > which offloaded a segment] and the remote auxiliary states,
> kept in
> > >> > > > sync?
> > >> > > >
> > >> > > > 900.c A follower can encounter an
> OFFSET_MOVED_TO_TIERED_STORAGE. Is
> > >> > > > this in response to a Fetch or OffsetForLeaderEpoch request?
> > >> > > >
> > >> > > > 900.d What happens if, after a follower encountered an
> > >> > > > OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to
> retrieve
> > >> > > > leader epochs fail (for instance, because the remote storage is
> > >> > > > temporarily unavailable)? Does the follower fallbacks to a mode
> > >> where
> > >> > > > it ignores tiered segments, and applies truncation using only
> > >> locally
> > >> > > > available information? What happens when access to the remote
> > >> storage
> > >> > > > is restored? How is the replica lineage inferred by the remote
> > >> leader
> > >> > > > epochs reconciled with the follower's replica lineage, which has
> > >> > > > evolved? Does the follower remember fetching auxiliary states
> failed
> > >> > > > in the past and attempt reconciliation? Is there a plan to offer
> > >> > > > different strategies in this scenario, configurable via
> > >> configuration?
> > >> > > >
> > >> > > > 900.e Is the leader epoch cache offloaded with every segment? Or
> > >> when
> > >> > > > a new checkpoint is detected? If that information is not always
> > >> > > > offloaded to avoid duplicating data, how does the remote storage
> > >> > > > satisfy the request to retrieve it?
> > >> > > >
> > >> > > > 900.f Since the leader epoch cache covers the entire replica
> > >> lineage,
> > >> > > > what happens if, after a leader epoch cache file is offloaded
> with a
> > >> > > > given segment, the local epoch cache is truncated [not
> necessarily
> > >> for
> > >> > > > a range of offset included in tiered segments]? How are remote
> and
> > >> > > > local leader epoch caches kept consistent?
> > >> > > >
> > >> > > > 900.g Consumer can also use leader epochs (e.g. to enable
> fencing to
> > >> > > > protect against stale leaders). What differences would there be
> > >> > > > between consumer and follower fetches? Especially, would
> consumers
> > >> > > > also fetch leader epoch information from the remote storage?
> > >> > > >
> > >> > > > 900.h Assume a newly elected leader of a topic-partition detects
> > >> more
> > >> > > > recent segments are available in the external storage, with
> epochs >
> > >> > > > its local epoch. Does it ignore these segments and their
> associated
> > >> > > > epoch-to-offset vectors? Or try to reconstruct its local replica
> > >> > > > lineage based on the data remotely available?
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Alexandre
> > >> > > >
> > >> > > > [1]
> > >> > >
> > >> >
> > >>
> https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing
> > >> > > >
> > >> > > > Le jeu. 4 juin 2020 à 19:55, Satish Duggana <
> > >> satish.dugg...@gmail.com>
> > >> > > a écrit :
> > >> > > > >
> > >> > > > > Hi Jun,
> > >> > > > > Please let us know if you have any comments on "transactional
> > >> > support"
> > >> > > > > and "follower requests/replication" mentioned in the wiki.
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Satish.
> > >> > > > >
> > >> > > > > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana <
> > >> > > satish.dugg...@gmail.com> wrote:
> > >> > > > > >
> > >> > > > > > Thanks Jun for your comments.
> > >> > > > > >
> > >> > > > > > >100. It would be useful to provide more details on how
> those
> > >> apis
> > >> > > are used. Otherwise, it's kind of hard to really assess whether
> the
> > >> new
> > >> > > apis are sufficient/redundant. A few examples below.
> > >> > > > > >
> > >> > > > > > We will update the wiki and let you know.
> > >> > > > > >
> > >> > > > > > >100.1 deleteRecords seems to only advance the
> logStartOffset in
> > >> > > Log. How does that trigger the deletion of remote log segments?
> > >> > > > > >
> > >> > > > > > RLMTask for leader partition periodically checks whether
> there
> > >> are
> > >> > > > > > remote log segments earlier to logStartOffset and the
> respective
> > >> > > > > > remote log segment metadata and data are deleted by using
> RLMM
> > >> and
> > >> > > > > > RSM.
> > >> > > > > >
> > >> > > > > > >100.2 stopReplica with deletion is used in 2 cases (a)
> replica
> > >> > > reassignment; (b) topic deletion. We only want to delete the
> tiered
> > >> > > metadata in the second case. Also, in the second case, who
> initiates
> > >> the
> > >> > > deletion of the remote segment since the leader may not exist?
> > >> > > > > >
> > >> > > > > > Right, it is deleted only incase of topic deletion only. We
> will
> > >> > > cover
> > >> > > > > > the details in the KIP.
> > >> > > > > >
> > >> > > > > > >100.3 "LogStartOffset of a topic can be either in local or
> in
> > >> > > remote storage." If LogStartOffset exists in both places, which
> one is
> > >> > the
> > >> > > source of truth?
> > >> > > > > >
> > >> > > > > > I meant the logStartOffset can point to either of local
> segment
> > >> or
> > >> > > > > > remote segment but it is initialised and maintained in the
> Log
> > >> > class
> > >> > > > > > like now.
> > >> > > > > >
> > >> > > > > > >100.4 List<RemoteLogSegmentMetadata>
> > >> > > listRemoteLogSegments(TopicPartition topicPartition, long
> minOffset):
> > >> How
> > >> > > is minOffset supposed to be used?
> > >> > > > > >
> > >> > > > > > Returns list of remote segments, sorted by baseOffset in
> > >> ascending
> > >> > > > > > order that have baseOffset >= the given min Offset.
> > >> > > > > >
> > >> > > > > > >100.5 When copying a segment to remote storage, it seems
> we are
> > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and
> after
> > >> > > copyLogSegment(). Could you explain why?
> > >> > > > > >
> > >> > > > > > This is more about prepare/commit/rollback as you
> suggested.  We
> > >> > will
> > >> > > > > > update the wiki with the new APIs.
> > >> > > > > >
> > >> > > > > > >100.6 LogSegmentData includes leaderEpochCache, but there
> is no
> > >> > api
> > >> > > in RemoteStorageManager to retrieve it.
> > >> > > > > >
> > >> > > > > > Nice catch, copy/paste issue. There is an API to retrieve
> it.
> > >> > > > > >
> > >> > > > > > >101. If the __remote_log_metadata is for production usage,
> > >> could
> > >> > > you provide more details? For example, what is the schema of the
> data
> > >> > (both
> > >> > > key and value)? How is the topic maintained,delete or compact?
> > >> > > > > >
> > >> > > > > > It is with delete config and it’s retention period is
> suggested
> > >> to
> > >> > be
> > >> > > > > > more than the remote retention period.
> > >> > > > > >
> > >> > > > > > >110. Is the cache implementation in
> RemoteLogMetadataManager
> > >> meant
> > >> > > for production usage? If so, could you provide more details on the
> > >> schema
> > >> > > and how/where the data is stored?
> > >> > > > > >
> > >> > > > > > The proposal is to have a cache (with default implementation
> > >> backed
> > >> > > by
> > >> > > > > > rocksdb) but it will be added in later versions. We will add
> > >> this
> > >> > to
> > >> > > > > > future work items.
> > >> > > > > >
> > >> > > > > > >111. "Committed offsets can be stored in a local file".
> Could
> > >> you
> > >> > > describe the format of the file and where it's stored?
> > >> > > > > >
> > >> > > > > > We will cover this in the KIP.
> > >> > > > > >
> > >> > > > > > >112. Truncation of remote segments under unclean leader
> > >> election:
> > >> > I
> > >> > > am not sure who figures out the truncated remote segments and how
> that
> > >> > > information is propagated to all replicas?
> > >> > > > > >
> > >> > > > > > We will add this in detail in the KIP.
> > >> > > > > >
> > >> > > > > > >113. "If there are any failures in removing remote log
> segments
> > >> > > then those are stored in a specific topic (default as
> > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet
> another
> > >> > > internal topic? Could we just keep retrying?
> > >> > > > > >
> > >> > > > > > This is not really an internal topic, it will be exposed as
> a
> > >> user
> > >> > > > > > configurable topic. After a few retries, we want user to
> know
> > >> about
> > >> > > > > > the failure so that they can take an action later by
> consuming
> > >> from
> > >> > > > > > this topic. We want to keep this simple instead of retrying
> > >> > > > > > continuously and maintaining the deletion state etc.
> > >> > > > > >
> > >> > > > > > >114. "We may not need to copy producer-id-snapshot as we
> are
> > >> > > copying only segments earlier to last-stable-offset." Hmm, not
> sure
> > >> about
> > >> > > that. The producer snapshot includes things like the last
> timestamp of
> > >> > each
> > >> > > open producer id and can affect when those producer ids are
> expired.
> > >> > > > > >
> > >> > > > > > Sure, this will be added as part of the LogSegmentData.
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > > Satish.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io>
> > >> wrote:
> > >> > > > > > >
> > >> > > > > > > Hi, Satish,
> > >> > > > > > >
> > >> > > > > > > Made another pass on the wiki. A few more comments below.
> > >> > > > > > >
> > >> > > > > > > 100. It would be useful to provide more details on how
> those
> > >> apis
> > >> > > are used. Otherwise, it's kind of hard to really assess whether
> the
> > >> new
> > >> > > apis are sufficient/redundant. A few examples below.
> > >> > > > > > > 100.1 deleteRecords seems to only advance the
> logStartOffset
> > >> in
> > >> > > Log. How does that trigger the deletion of remote log segments?
> > >> > > > > > > 100.2 stopReplica with deletion is used in 2 cases (a)
> replica
> > >> > > reassignment; (b) topic deletion. We only want to delete the
> tiered
> > >> > > metadata in the second case. Also, in the second case, who
> initiates
> > >> the
> > >> > > deletion of the remote segment since the leader may not exist?
> > >> > > > > > > 100.3 "LogStartOffset of a topic can be either in local
> or in
> > >> > > remote storage." If LogStartOffset exists in both places, which
> one is
> > >> > the
> > >> > > source of truth?
> > >> > > > > > > 100.4 List<RemoteLogSegmentMetadata>
> > >> > > listRemoteLogSegments(TopicPartition topicPartition, long
> minOffset):
> > >> How
> > >> > > is minOffset supposed to be used?
> > >> > > > > > > 100.5 When copying a segment to remote storage, it seems
> we
> > >> are
> > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and
> after
> > >> > > copyLogSegment(). Could you explain why?
> > >> > > > > > > 100.6 LogSegmentData includes leaderEpochCache, but there
> is
> > >> no
> > >> > > api in RemoteStorageManager to retrieve it.
> > >> > > > > > >
> > >> > > > > > > 101. If the __remote_log_metadata is for production usage,
> > >> could
> > >> > > you provide more details? For example, what is the schema of the
> data
> > >> > (both
> > >> > > key and value)? How is the topic maintained,delete or compact?
> > >> > > > > > >
> > >> > > > > > > 110. Is the cache implementation in
> RemoteLogMetadataManager
> > >> > meant
> > >> > > for production usage? If so, could you provide more details on the
> > >> schema
> > >> > > and how/where the data is stored?
> > >> > > > > > >
> > >> > > > > > > 111. "Committed offsets can be stored in a local file".
> Could
> > >> you
> > >> > > describe the format of the file and where it's stored?
> > >> > > > > > >
> > >> > > > > > > 112. Truncation of remote segments under unclean leader
> > >> election:
> > >> > > I am not sure who figures out the truncated remote segments and
> how
> > >> that
> > >> > > information is propagated to all replicas?
> > >> > > > > > >
> > >> > > > > > > 113. "If there are any failures in removing remote log
> > >> segments
> > >> > > then those are stored in a specific topic (default as
> > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet
> another
> > >> > > internal topic? Could we just keep retrying?
> > >> > > > > > >
> > >> > > > > > > 114. "We may not need to copy producer-id-snapshot as we
> are
> > >> > > copying only segments earlier to last-stable-offset." Hmm, not
> sure
> > >> about
> > >> > > that. The producer snapshot includes things like the last
> timestamp of
> > >> > each
> > >> > > open producer id and can affect when those producer ids are
> expired.
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > >
> > >> > > > > > > Jun
> > >> > > > > > >
> > >> > > > > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana <
> > >> > > satish.dugg...@gmail.com> wrote:
> > >> > > > > > >>
> > >> > > > > > >> Hi Jun,
> > >> > > > > > >> Gentle reminder. Please go through the updated wiki and
> let
> > >> us
> > >> > > know your comments.
> > >> > > > > > >>
> > >> > > > > > >> Thanks,
> > >> > > > > > >> Satish.
> > >> > > > > > >>
> > >> > > > > > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana <
> > >> > > satish.dugg...@gmail.com> wrote:
> > >> > > > > > >>>
> > >> > > > > > >>> Hi Jun,
> > >> > > > > > >>> Please go through the wiki which has the latest updates.
> > >> Google
> > >> > > doc is updated frequently to be in sync with wiki.
> > >> > > > > > >>>
> > >> > > > > > >>> Thanks,
> > >> > > > > > >>> Satish.
> > >> > > > > > >>>
> > >> > > > > > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao <
> j...@confluent.io>
> > >> > > wrote:
> > >> > > > > > >>>>
> > >> > > > > > >>>> Hi, Satish,
> > >> > > > > > >>>>
> > >> > > > > > >>>> Thanks for the update. Just to clarify. Which doc has
> the
> > >> > > latest updates, the wiki or the google doc?
> > >> > > > > > >>>>
> > >> > > > > > >>>> Jun
> > >> > > > > > >>>>
> > >> > > > > > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana <
> > >> > > satish.dugg...@gmail.com> wrote:
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Hi Jun,
> > >> > > > > > >>>>> Thanks for your comments.  We updated the KIP with
> more
> > >> > > details.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> >100. For each of the operations related to tiering,
> it
> > >> would
> > >> > > be useful to provide a description on how it works with the new
> API.
> > >> > These
> > >> > > include things like consumer fetch, replica fetch,
> offsetForTimestamp,
> > >> > > retention (remote and local) by size, time and logStartOffset,
> topic
> > >> > > deletion, etc. This will tell us if the proposed APIs are
> sufficient.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> We addressed most of these APIs in the KIP. We can add
> > >> more
> > >> > > details if needed.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> >101. For the default implementation based on internal
> > >> topic,
> > >> > > is it meant as a proof of concept or for production usage? I
> assume
> > >> that
> > >> > > it's the former. However, if it's the latter, then the KIP needs
> to
> > >> > > describe the design in more detail.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> It is production usage as was mentioned in an earlier
> > >> mail.
> > >> > We
> > >> > > plan to update this section in the next few days.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> >102. When tiering a segment, the segment is first
> > >> written to
> > >> > > the object store and then its metadata is written to RLMM using
> the
> > >> api
> > >> > > "void putRemoteLogSegmentData()". One potential issue with this
> > >> approach
> > >> > is
> > >> > > that if the system fails after the first operation, it leaves a
> > >> garbage
> > >> > in
> > >> > > the object store that's never reclaimed. One way to improve this
> is to
> > >> > have
> > >> > > two separate APIs, sth like preparePutRemoteLogSegmentData() and
> > >> > > commitPutRemoteLogSegmentData().
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> That is a good point. We currently have a different
> way
> > >> using
> > >> > > markers in the segment but your suggestion is much better.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> >103. It seems that the transactional support and the
> > >> ability
> > >> > > to read from follower are missing.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> KIP is updated with transactional support, follower
> fetch
> > >> > > semantics, and reading from a follower.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> >104. It would be useful to provide a testing plan for
> > >> this
> > >> > > KIP.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> We added a few tests by introducing test util for
> tiered
> > >> > > storage in the PR. We will provide the testing plan in the next
> few
> > >> days.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Thanks,
> > >> > > > > > >>>>> Satish.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani <
> > >> > > ka...@harsha.io> wrote:
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao <
> > >> j...@confluent.io
> > >> > >
> > >> > > wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi, Satish,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks for the updated doc. The new API seems to be
> an
> > >> > > improvement overall. A few more comments below.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 100. For each of the operations related to tiering,
> it
> > >> > would
> > >> > > be useful to provide a description on how it works with the new
> API.
> > >> > These
> > >> > > include things like consumer fetch, replica fetch,
> offsetForTimestamp,
> > >> > > retention
> > >> > > > > > >>>>>>> (remote and local) by size, time and logStartOffset,
> > >> topic
> > >> > > deletion, etc. This will tell us if the proposed APIs are
> sufficient.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> Thanks for the feedback Jun. We will add more details
> > >> around
> > >> > > this.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 101. For the default implementation based on
> internal
> > >> > topic,
> > >> > > is it meant as a proof of concept or for production usage? I
> assume
> > >> that
> > >> > > it's the former. However, if it's the latter, then the KIP needs
> to
> > >> > > describe the design in more detail.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> Yes it meant to be for production use.  Ideally it
> would
> > >> be
> > >> > > good to merge this in as the default implementation for metadata
> > >> service.
> > >> > > We can add more details around design and testing.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>> 102. When tiering a segment, the segment is first
> > >> written
> > >> > to
> > >> > > the object store and then its metadata is written to RLMM using
> the
> > >> api
> > >> > > "void putRemoteLogSegmentData()".
> > >> > > > > > >>>>>>> One potential issue with this approach is that if
> the
> > >> > system
> > >> > > fails after the first operation, it leaves a garbage in the object
> > >> store
> > >> > > that's never reclaimed. One way to improve this is to have two
> > >> separate
> > >> > > APIs, sth like preparePutRemoteLogSegmentData() and
> > >> > > commitPutRemoteLogSegmentData().
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 103. It seems that the transactional support and the
> > >> > ability
> > >> > > to read from follower are missing.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 104. It would be useful to provide a testing plan
> for
> > >> this
> > >> > > KIP.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> We are working on adding more details around
> > >> transactional
> > >> > > support and coming up with test plan.
> > >> > > > > > >>>>>> Add system tests and integration tests.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Jun
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana <
> > >> > > satish.dugg...@gmail.com> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Jun,
> > >> > > > > > >>>>>>> Please look at the earlier reply and let us know
> your
> > >> > > comments.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>> Satish.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana <
> > >> > > satish.dugg...@gmail.com> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Jun,
> > >> > > > > > >>>>>>> Thanks for your comments on the separation of
> remote log
> > >> > > metadata storage and remote log storage.
> > >> > > > > > >>>>>>> We had a few discussions since early Jan on how to
> > >> support
> > >> > > eventually consistent stores like S3 by uncoupling remote log
> segment
> > >> > > metadata and remote log storage. It is written with details in
> the doc
> > >> > > here(1). Below is the brief summary of the discussion from that
> doc.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The current approach consists of pulling the remote
> log
> > >> > > segment metadata from remote log storage APIs. It worked fine for
> > >> > storages
> > >> > > like HDFS. But one of the problems of relying on the remote
> storage to
> > >> > > maintain metadata is that tiered-storage needs to be strongly
> > >> consistent,
> > >> > > with an impact not only on the metadata(e.g. LIST in S3) but also
> on
> > >> the
> > >> > > segment data(e.g. GET after a DELETE in S3). The cost of
> maintaining
> > >> > > metadata in remote storage needs to be factored in. This is true
> in
> > >> the
> > >> > > case of S3, LIST APIs incur huge costs as you raised earlier.
> > >> > > > > > >>>>>>> So, it is good to separate the remote storage from
> the
> > >> > > remote log metadata store. We refactored the existing
> > >> > RemoteStorageManager
> > >> > > and introduced RemoteLogMetadataManager. Remote log metadata store
> > >> should
> > >> > > give strong consistency semantics but remote log storage can be
> > >> > eventually
> > >> > > consistent.
> > >> > > > > > >>>>>>> We can have a default implementation for
> > >> > > RemoteLogMetadataManager which uses an internal topic(as
> mentioned in
> > >> one
> > >> > > of our earlier emails) as storage. But users can always plugin
> their
> > >> own
> > >> > > RemoteLogMetadataManager implementation based on their
> environment.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Please go through the updated KIP and let us know
> your
> > >> > > comments. We have started refactoring for the changes mentioned
> in the
> > >> > KIP
> > >> > > and there may be a few more updates to the APIs.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [1]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > >
> > >> >
> > >>
> https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7#
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko <
> > >> > > ivan0yurche...@gmail.com>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi all,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Jun:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per
> 1000
> > >> > > requests. If
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> you
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the
> metadata
> > >> for
> > >> > > each
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> partition
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> at
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is
> > >> roughly
> > >> > > $40K per
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> day.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I want to note here, that no reasonably durable
> storage
> > >> > will
> > >> > > be cheap at 100k RPS. For example, DynamoDB might give the same
> > >> ballpark
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> figures.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> If we want to keep the pull-based approach, we can
> try
> > >> to
> > >> > > reduce this
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> number
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in several ways: doing listings less frequently (as
> > >> Satish
> > >> > > mentioned, with the current defaults it's ~3.33k RPS for your
> > >> example),
> > >> > > batching listing operations in some way (depending on the
> storage; it
> > >> > might
> > >> > > require the change of RSM's interface).
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> There are different ways for doing push based
> metadata
> > >> > > propagation.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Some
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> object stores may support that already. For
> example, S3
> > >> > > supports
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> events
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> notification
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> This sounds interesting. However, I see a couple of
> > >> issues
> > >> > > using it:
> > >> > > > > > >>>>>>> 1. As I understand the documentation, notification
> > >> delivery
> > >> > > is not guaranteed
> > >> > > > > > >>>>>>> and it's recommended to periodically do LIST to
> fill the
> > >> > > gaps. Which brings us back to the same LIST consistency guarantees
> > >> issue.
> > >> > > > > > >>>>>>> 2. The same goes for the broker start: to get the
> > >> current
> > >> > > state, we
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> need
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to LIST.
> > >> > > > > > >>>>>>> 3. The dynamic set of multiple consumers (RSMs):
> AFAIK
> > >> SQS
> > >> > > and SNS
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> aren't
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> designed for such a case.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Alexandre:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> A.1 As commented on PR 7561, S3 consistency model
> [1][2]
> > >> > > implies RSM
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> cannot
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> relies solely on S3 APIs to guarantee the expected
> > >> strong
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> consistency. The
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> proposed implementation [3] would need to be
> updated to
> > >> > take
> > >> > > this
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> into
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> account. Let’s talk more about this.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thank you for the feedback. I clearly see the need
> for
> > >> > > changing the S3 implementation
> > >> > > > > > >>>>>>> to provide stronger consistency guarantees. As it
> see
> > >> from
> > >> > > this thread, there are
> > >> > > > > > >>>>>>> several possible approaches to this. Let's discuss
> > >> > > RemoteLogManager's contract and
> > >> > > > > > >>>>>>> behavior (like pull vs push model) further before
> > >> picking
> > >> > > one (or
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> several -
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> ?) of them.
> > >> > > > > > >>>>>>> I'm going to do some evaluation of DynamoDB for the
> > >> > > pull-based
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> approach,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> if it's possible to apply it paying a reasonable
> bill.
> > >> > Also,
> > >> > > of the push-based approach
> > >> > > > > > >>>>>>> with a Kafka topic as the medium.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> A.2.3 Atomicity – what does an implementation of RSM
> > >> need
> > >> > to
> > >> > > provide
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> with
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> respect to atomicity of the APIs copyLogSegment,
> > >> > > cleanupLogUntil and deleteTopicPartition? If a partial failure
> > >> happens in
> > >> > > any of those
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (e.g.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the S3 implementation, if one of the multiple
> uploads
> > >> fails
> > >> > > [4]),
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The S3 implementation is going to change, but it's
> worth
> > >> > > clarifying
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> anyway.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The segment log file is being uploaded after S3 has
> > >> acked
> > >> > > uploading of all other files associated with the segment and only
> > >> after
> > >> > > this the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> whole
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> segment file set becomes visible remotely for
> operations
> > >> > > like listRemoteSegments [1].
> > >> > > > > > >>>>>>> In case of upload failure, the files that has been
> > >> > > successfully
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> uploaded
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> stays
> > >> > > > > > >>>>>>> as invisible garbage that is collected by
> > >> cleanupLogUntil
> > >> > (or
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> overwritten
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> successfully later).
> > >> > > > > > >>>>>>> And the opposite happens during the deletion: log
> files
> > >> are
> > >> > > deleted
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> first.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> This approach should generally work when we solve
> > >> > > consistency issues by adding a strongly consistent storage: a
> > >> segment's
> > >> > > uploaded files
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> remain
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> invisible garbage until some metadata about them is
> > >> > written.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> A.3 Caching – storing locally the segments retrieved
> > >> from
> > >> > > the remote storage is excluded as it does not align with the
> original
> > >> > intent
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and even
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> defeat some of its purposes (save disk space etc.).
> That
> > >> > > said, could
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> there
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> be other types of use cases where the pattern of
> access
> > >> to
> > >> > > the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> remotely
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> stored segments would benefit from local caching
> (and
> > >> > > potentially read-ahead)? Consider the use case of a large pool of
> > >> > consumers
> > >> > > which
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> start
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> a backfill at the same time for one day worth of
> data
> > >> from
> > >> > > one year
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> ago
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> stored remotely. Caching the segments locally would
> > >> allow
> > >> > to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> uncouple the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> load on the remote storage from the load on the
> Kafka
> > >> > > cluster. Maybe
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> RLM could expose a configuration parameter to switch
> > >> that
> > >> > > feature
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> on/off?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I tend to agree here, caching remote segments
> locally
> > >> and
> > >> > > making this configurable sounds pretty practical to me. We should
> > >> > implement
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> maybe not in the first iteration.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Br,
> > >> > > > > > >>>>>>> Ivan
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [1]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > >
> > >> >
> > >>
> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez <
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> alexandre.dupr...@gmail.com>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Jun,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thank you for the feedback. I am trying to
> understand
> > >> how a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> push-based
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> approach would work.
> > >> > > > > > >>>>>>> In order for the metadata to be propagated (under
> the
> > >> > > assumption you stated), would you plan to add a new API in Kafka
> to
> > >> allow
> > >> > > the metadata store to send them directly to the brokers?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>> Alexandre
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao <
> j...@confluent.io>
> > >> a
> > >> > > écrit :
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi, Satish and Ivan,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> There are different ways for doing push based
> metadata
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> propagation. Some
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> object stores may support that already. For
> example, S3
> > >> > > supports
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> events
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> notification (
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > >
> > >>
> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Otherwise one could use a separate metadata store
> that
> > >> > > supports
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> push-based
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> change propagation. Other people have mentioned
> using a
> > >> > Kafka
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> topic. The
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> best approach may depend on the object store and the
> > >> > > operational environment (e.g. whether an external metadata store
> is
> > >> > already
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> available).
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The above discussion is based on the assumption
> that we
> > >> > need
> > >> > > to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> cache the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> object metadata locally in every broker. I mentioned
> > >> > earlier
> > >> > > that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> an
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> alternative is to just store/retrieve those
> metadata in
> > >> an
> > >> > > external metadata store. That may simplify the implementation in
> some
> > >> > cases.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Jun
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana <
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> satish.dugg...@gmail.com>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Jun,
> > >> > > > > > >>>>>>> Thanks for your reply.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Currently, `listRemoteSegments` is called at the
> > >> configured
> > >> > > interval(not every second, defaults to 30secs). Storing remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> log
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> metadata in a strongly consistent store for S3 RSM
> is
> > >> > raised
> > >> > > in PR-comment[1].
> > >> > > > > > >>>>>>> RLM invokes RSM at regular intervals and RSM can
> give
> > >> > remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> segment
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> metadata if it is available. RSM is responsible for
> > >> > > maintaining
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> fetching those entries. It should be based on
> whatever
> > >> > > mechanism
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> consistent and efficient with the respective remote
> > >> > storage.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Can you give more details about push based mechanism
> > >> from
> > >> > > RSM?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 1.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > https://github.com/apache/kafka/pull/7561#discussion_r344576223
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>> Satish.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <
> > >> j...@confluent.io>
> > >> > > wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi, Harsha,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks for the reply.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 40/41. I am curious which block storages you have
> > >> tested.
> > >> > S3
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> seems
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to be
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> one of the popular block stores. The concerns that I
> > >> have
> > >> > > with
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> pull
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> based
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> approach are the following.
> > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per
> 1000
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> requests. If
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> you
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the
> metadata
> > >> for
> > >> > > each
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> partition
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> at
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is
> > >> roughly
> > >> > > $40K
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> per
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> day.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (b) Semantics: S3 list objects are eventually
> > >> consistent.
> > >> > So,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> when
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> you
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> do a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> list object request, there is no guarantee that you
> can
> > >> see
> > >> > > all
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> uploaded
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> objects. This could impact the correctness of
> subsequent
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> logics.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (c) Efficiency: Blindly pulling metadata when there
> is
> > >> no
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> change adds
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> unnecessary overhead in the broker as well as in the
> > >> block
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> store.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> So, have you guys tested S3? If so, could you share
> your
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> experience
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> terms of cost, semantics and efficiency?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Jun
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani
> <
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> ka...@harsha.io
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Jun,
> > >> > > > > > >>>>>>> Thanks for the reply.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <
> > >> j...@confluent.io
> > >> > >
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi, Satish and Ying,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks for the reply.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 40/41. There are two different ways that we can
> approach
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> One is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> what
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> you said. We can have an opinionated way of storing
> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> populating
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> tier
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> metadata that we think is good enough for everyone.
> I am
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> not
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> sure if
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> is the case based on what's currently proposed in
> the
> > >> KIP.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> For
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> example, I
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> am not sure that (1) everyone always needs local
> > >> metadata;
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (2)
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> current
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> local storage format is general enough and (3)
> everyone
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wants to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> use
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> pull based approach to propagate the metadata.
> Another
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> approach
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> is to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> make
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this pluggable and let the implementor implements
> the
> > >> best
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> approach
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> for a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> particular block storage. I haven't seen any
> comments
> > >> from
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Slack/AirBnb
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the mailing list on this topic. It would be great if
> > >> they
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> can
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> provide
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> feedback directly here.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The current interfaces are designed with most
> popular
> > >> block
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storages
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> available today and we did 2 implementations with
> these
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> interfaces and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> they both are yielding good results as we going
> through
> > >> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> testing of
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> it.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> If there is ever a need for pull based approach we
> can
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> definitely
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> evolve
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the interface.
> > >> > > > > > >>>>>>> In the past we did mark interfaces to be evolving to
> > >> make
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> room for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> unknowns
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in the future.
> > >> > > > > > >>>>>>> If you have any suggestions around the current
> > >> interfaces
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> please
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> propose we
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> are happy to see if we can work them into it.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 43. To offer tier storage as a general feature,
> ideally
> > >> all
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> existing
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> capabilities should still be supported. It's fine
> if the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> uber
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> implementation doesn't support all capabilities for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> internal
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> usage.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> However, the framework should be general enough.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> We agree on that as a principle. But all of these
> major
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> features
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> mostly
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> coming right now and to have a new big feature such
> as
> > >> > tiered
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> support all the new features will be a big ask. We
> can
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> document on
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> how
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> do
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> we approach solving these in future iterations.
> > >> > > > > > >>>>>>> Our goal is to make this tiered storage feature
> work for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> everyone.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 43.3 This is more than just serving the tier-ed data
> > >> from
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> block
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> With KIP-392, the consumer now can resolve the
> conflicts
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> with the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> replica
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> based on leader epoch. So, we need to make sure that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> leader epoch
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> can be
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> recovered properly from tier storage.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> We are working on testing our approach and we will
> > >> update
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the KIP
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> with
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> design details.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 43.4 For JBOD, if tier storage stores the tier
> metadata
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> locally, we
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> need to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> support moving such metadata across disk directories
> > >> since
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> JBOD
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> supports
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> moving data across disks.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> KIP is updated with JBOD details. Having said that
> JBOD
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> tooling
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> needs
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> evolve to support production loads. Most of the
> users
> > >> will
> > >> > be
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> interested in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> using tiered storage without JBOD support support on
> > >> day 1.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>> Harsha
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> As for meeting, we could have a KIP e-meeting on
> this if
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> needed,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> but it
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> will be open to everyone and will be recorded and
> > >> shared.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Often,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> details are still resolved through the mailing list.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Jun
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> <yi...@uber.com.invalid>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Please ignore my previous email
> > >> > > > > > >>>>>>> I didn't know Apache requires all the discussions
> to be
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> "open"
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <
> > >> yi...@uber.com>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi Jun,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thank you very much for your feedback!
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Can we schedule a meeting in your Palo Alto office
> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> December? I
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> think a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> face to face discussion is much more efficient than
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> emails. Both
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Harsha
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I can visit you. Satish may be able to join us
> remotely.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <
> > >> j...@confluent.io
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Hi, Satish and Harsha,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The following is a more detailed high level
> feedback for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the KIP.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Overall,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the KIP seems useful. The challenge is how to
> design it
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> such that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> it’s
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> general enough to support different ways of
> implementing
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> feature
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> support existing features.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 40. Local segment metadata storage: The KIP makes
> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> assumption
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> metadata for the archived log segments are cached
> > >> locally
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> every
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> broker
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and provides a specific implementation for the local
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> framework. We probably should discuss this more. For
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> example,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> some
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> tier
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage providers may not want to cache the metadata
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> locally and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> just
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> rely
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> upon a remote key/value store if such a store is
> already
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> present. If
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> local store is used, there could be different ways
> of
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> implementing it
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (e.g., based on customized local files, an embedded
> > >> local
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> store
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> like
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> RocksDB, etc). An alternative of designing this is
> to
> > >> just
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> provide an
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> interface for retrieving the tier segment metadata
> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> leave the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> details
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> of
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> how to get the metadata outside of the framework.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 41. RemoteStorageManager interface and the usage of
> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> interface in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> framework: I am not sure if the interface is general
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> enough. For
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> example,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> it seems that RemoteLogIndexEntry is tied to a
> specific
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> way of
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storing
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> metadata in remote storage. The framework uses
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> listRemoteSegments()
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> api
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> a pull based approach. However, in some other
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> implementations, a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> push
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> based
> > >> > > > > > >>>>>>> approach may be more preferred. I don’t have a
> concrete
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> proposal
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> yet.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> But,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> it would be useful to give this area some more
> thoughts
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and see
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> if we
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> can
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> make the interface more general.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 42. In the diagram, the RemoteLogManager is side by
> side
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> with
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> LogManager.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> This KIP only discussed how the fetch request is
> handled
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> between
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> two
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> layer. However, we should also consider how other
> > >> requests
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> touch
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> log can be handled. e.g., list offsets by timestamp,
> > >> delete
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> records,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> etc.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Also, in this model, it's not clear which component
> is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> responsible
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> managing the log start offset. It seems that the log
> > >> start
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> offset
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> could
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> be
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> changed by both RemoteLogManager and LogManager.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 43. There are quite a few existing features not
> covered
> > >> by
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> KIP.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> It
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> would be useful to discuss each of those.
> > >> > > > > > >>>>>>> 43.1 I won’t say that compacted topics are rarely
> used
> > >> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> always
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> small.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> For example, KStreams uses compacted topics for
> storing
> > >> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> states
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> sometimes the size of the topic could be large.
> While it
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> might
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> be ok
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> not
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> support compacted topics initially, it would be
> useful
> > >> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> have a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> high
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> level
> > >> > > > > > >>>>>>> idea on how this might be supported down the road so
> > >> that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> we
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> don’t
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> have
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> make incompatible API changes in the future.
> > >> > > > > > >>>>>>> 43.2 We need to discuss how EOS is supported. In
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> particular, how
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> producer state integrated with the remote storage.
> 43.3
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Now that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> KIP-392
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (allow consumers to fetch from closest replica) is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> implemented,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> we
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> need
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> discuss how reading from a follower replica is
> supported
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> with
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> tier
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> 43.4 We need to discuss how JBOD is supported with
> tier
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Jun
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley <
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> tbent...@redhat.com
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks for those insights Ying.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> <yi...@uber.com.invalid
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks, I missed that point. However, there's still
> a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> point at
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> which
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> consumer fetches start getting served from remote
> > >> storage
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (even
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> if
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> point isn't as soon as the local log retention
> > >> time/size).
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> This
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> represents
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> a kind of performance cliff edge and what I'm really
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> interested
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> in
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> how
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> easy it is for a consumer which falls off that
> cliff to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> catch up
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> and so
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> its
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> fetches again come from local storage. Obviously
> this
> > >> can
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> depend
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> on
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> all
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> sorts of factors (like production rate, consumption
> > >> rate),
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> so
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> it's
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> not
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> guaranteed (just like it's not guaranteed for Kafka
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> today), but
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> would
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> represent a new failure mode.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> As I have explained in the last mail, it's a very
> rare
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> case that
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> a
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> consumer
> > >> > > > > > >>>>>>> need to read remote data. With our experience at
> Uber,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> this only
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> happens
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> when the consumer service had an outage for several
> > >> hours.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> There is not a "performance cliff" as you assume.
> The
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> is
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> even faster than local disks in terms of bandwidth.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Reading from
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage is going to have higher latency than local
> disk.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> But
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> since
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> consumer
> > >> > > > > > >>>>>>> is catching up several hours data, it's not
> sensitive to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> sub-second
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> level
> > >> > > > > > >>>>>>> latency, and each remote read request will read a
> large
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> amount of
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> data to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> make the overall performance better than reading
> from
> > >> local
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> disks.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Another aspect I'd like to understand better is the
> > >> effect
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> of
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> serving
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> fetch
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> request from remote storage has on the broker's
> network
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> utilization. If
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> we're just trimming the amount of data held locally
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (without
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> increasing
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> overall local+remote retention), then we're
> effectively
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> trading
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> disk
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> bandwidth for network bandwidth when serving fetch
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> requests from
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage (which I understand to be a good thing,
> since
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> brokers are
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> often/usually disk bound). But if we're increasing
> the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> overall
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> local+remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> retention then it's more likely that network itself
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> becomes the
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> bottleneck.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I appreciate this is all rather hand wavy, I'm just
> > >> trying
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> understand
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> how this would affect broker performance, so I'd be
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> grateful for
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> any
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> insights you can offer.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Network bandwidth is a function of produce speed,
> it has
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> nothing
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> to
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> do
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> with
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> remote retention. As long as the data is shipped to
> > >> remote
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> storage,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> you
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> can
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> keep the data there for 1 day or 1 year or 100
> years, it
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> doesn't
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> consume
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> any
> > >> > > > > > >>>>>>> network resources.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to