Hi, Satish,

Made another pass on the wiki. A few more comments below.

100. It would be useful to provide more details on how those apis are used.
Otherwise, it's kind of hard to really assess whether the new apis are
sufficient/redundant. A few examples below.
100.1 deleteRecords seems to only advance the logStartOffset in Log. How
does that trigger the deletion of remote log segments?
100.2 stopReplica with deletion is used in 2 cases (a) replica
reassignment; (b) topic deletion. We only want to delete the tiered
metadata in the second case. Also, in the second case, who initiates the
deletion of the remote segment since the leader may not exist?
100.3 "LogStartOffset of a topic can be either in local or in remote
storage." If LogStartOffset exists in both places, which one is the source
of truth?
100.4 List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition
topicPartition, long minOffset): How is minOffset supposed to be used?
100.5 When copying a segment to remote storage, it seems we are calling the
same RLMM.putRemoteLogSegmentData() twice before and
after copyLogSegment(). Could you explain why?
100.6 LogSegmentData includes leaderEpochCache, but there is no api
in RemoteStorageManager to retrieve it.

101. If the __remote_log_metadata is for production usage, could you
provide more details? For example, what is the schema of the data (both
key and value)? How is the topic maintained,delete or compact?

110. Is the cache implementation in RemoteLogMetadataManager meant for
production usage? If so, could you provide more details on the schema and
how/where the data is stored?

111. "Committed offsets can be stored in a local file". Could you describe
the format of the file and where it's stored?

112. Truncation of remote segments under unclean leader election: I am not
sure who figures out the truncated remote segments and how that information
is propagated to all replicas?

113. "If there are any failures in removing remote log segments then those
are stored in a specific topic (default as
__remote_segments_to_be_deleted)". Is it necessary to add yet another
internal topic? Could we just keep retrying?

114. "We may not need to copy producer-id-snapshot as we are copying only
segments earlier to last-stable-offset." Hmm, not sure about that. The
producer snapshot includes things like the last timestamp of each open
producer id and can affect when those producer ids are expired.

Thanks,

Jun

On Thu, May 28, 2020 at 5:38 AM Satish Duggana <satish.dugg...@gmail.com>
wrote:

> Hi Jun,
> Gentle reminder. Please go through the updated wiki and let us know your
> comments.
>
> Thanks,
> Satish.
>
> On Tue, May 19, 2020 at 3:50 PM Satish Duggana <satish.dugg...@gmail.com>
> wrote:
>
>> Hi Jun,
>> Please go through the wiki which has the latest updates. Google doc is
>> updated frequently to be in sync with wiki.
>>
>> Thanks,
>> Satish.
>>
>> On Tue, May 19, 2020 at 12:30 AM Jun Rao <j...@confluent.io> wrote:
>>
>>> Hi, Satish,
>>>
>>> Thanks for the update. Just to clarify. Which doc has the latest
>>> updates, the wiki or the google doc?
>>>
>>> Jun
>>>
>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana <
>>> satish.dugg...@gmail.com> wrote:
>>>
>>>> Hi Jun,
>>>> Thanks for your comments.  We updated the KIP with more details.
>>>>
>>>> >100. For each of the operations related to tiering, it would be useful
>>>> to provide a description on how it works with the new API. These include
>>>> things like consumer fetch, replica fetch, offsetForTimestamp, retention
>>>> (remote and local) by size, time and logStartOffset, topic deletion, etc.
>>>> This will tell us if the proposed APIs are sufficient.
>>>>
>>>> We addressed most of these APIs in the KIP. We can add more details if
>>>> needed.
>>>>
>>>> >101. For the default implementation based on internal topic, is it
>>>> meant as a proof of concept or for production usage? I assume that it's the
>>>> former. However, if it's the latter, then the KIP needs to describe the
>>>> design in more detail.
>>>>
>>>> It is production usage as was mentioned in an earlier mail. We plan to
>>>> update this section in the next few days.
>>>>
>>>> >102. When tiering a segment, the segment is first written to the
>>>> object store and then its metadata is written to RLMM using the api "void
>>>> putRemoteLogSegmentData()". One potential issue with this approach is
>>>> that if the system fails after the first operation, it leaves a garbage in
>>>> the object store that's never reclaimed. One way to improve this is to have
>>>> two separate APIs, sth like preparePutRemoteLogSegmentData() and
>>>> commitPutRemoteLogSegmentData().
>>>>
>>>> That is a good point. We currently have a different way using markers
>>>> in the segment but your suggestion is much better.
>>>>
>>>> >103. It seems that the transactional support and the ability to read
>>>> from follower are missing.
>>>>
>>>> KIP is updated with transactional support, follower fetch semantics,
>>>> and reading from a follower.
>>>>
>>>> >104. It would be useful to provide a testing plan for this KIP.
>>>>
>>>> We added a few tests by introducing test util for tiered storage in the
>>>> PR. We will provide the testing plan in the next few days.
>>>>
>>>> Thanks,
>>>> Satish.
>>>>
>>>>
>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani <ka...@harsha.io>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao <j...@confluent.io> wrote:
>>>>>
>>>>>> Hi, Satish,
>>>>>>
>>>>>> Thanks for the updated doc. The new API seems to be an improvement
>>>>>> overall. A few more comments below.
>>>>>>
>>>>>> 100. For each of the operations related to tiering, it would be
>>>>>> useful to provide a description on how it works with the new API. These
>>>>>> include things like consumer fetch, replica fetch, offsetForTimestamp,
>>>>>> retention
>>>>>> (remote and local) by size, time and logStartOffset, topic deletion,
>>>>>> etc. This will tell us if the proposed APIs are sufficient.
>>>>>>
>>>>>
>>>>> Thanks for the feedback Jun. We will add more details around this.
>>>>>
>>>>>
>>>>>> 101. For the default implementation based on internal topic, is it
>>>>>> meant as a proof of concept or for production usage? I assume that it's 
>>>>>> the
>>>>>> former. However, if it's the latter, then the KIP needs to describe the
>>>>>> design in more detail.
>>>>>>
>>>>>
>>>>> Yes it meant to be for production use.  Ideally it would be good to
>>>>> merge this in as the default implementation for metadata service. We can
>>>>> add more details around design and testing.
>>>>>
>>>>> 102. When tiering a segment, the segment is first written to the
>>>>>> object store and then its metadata is written to RLMM using the api "void
>>>>>> putRemoteLogSegmentData()".
>>>>>> One potential issue with this approach is that if the system fails
>>>>>> after the first operation, it leaves a garbage in the object store that's
>>>>>> never reclaimed. One way to improve this is to have two separate APIs, 
>>>>>> sth
>>>>>> like preparePutRemoteLogSegmentData() and 
>>>>>> commitPutRemoteLogSegmentData().
>>>>>>
>>>>>> 103. It seems that the transactional support and the ability to read
>>>>>> from follower are missing.
>>>>>>
>>>>>> 104. It would be useful to provide a testing plan for this KIP.
>>>>>>
>>>>>
>>>>> We are working on adding more details around transactional support and
>>>>> coming up with test plan.
>>>>> Add system tests and integration tests.
>>>>>
>>>>> Thanks,
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana <
>>>>>> satish.dugg...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Jun,
>>>>>> Please look at the earlier reply and let us know your comments.
>>>>>>
>>>>>> Thanks,
>>>>>> Satish.
>>>>>>
>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana <
>>>>>> satish.dugg...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Jun,
>>>>>> Thanks for your comments on the separation of remote log metadata
>>>>>> storage and remote log storage.
>>>>>> We had a few discussions since early Jan on how to support eventually
>>>>>> consistent stores like S3 by uncoupling remote log segment metadata and
>>>>>> remote log storage. It is written with details in the doc here(1). Below 
>>>>>> is
>>>>>> the brief summary of the discussion from that doc.
>>>>>>
>>>>>> The current approach consists of pulling the remote log segment
>>>>>> metadata from remote log storage APIs. It worked fine for storages like
>>>>>> HDFS. But one of the problems of relying on the remote storage to 
>>>>>> maintain
>>>>>> metadata is that tiered-storage needs to be strongly consistent, with an
>>>>>> impact not only on the metadata(e.g. LIST in S3) but also on the segment
>>>>>> data(e.g. GET after a DELETE in S3). The cost of maintaining metadata in
>>>>>> remote storage needs to be factored in. This is true in the case of S3,
>>>>>> LIST APIs incur huge costs as you raised earlier.
>>>>>> So, it is good to separate the remote storage from the remote log
>>>>>> metadata store. We refactored the existing RemoteStorageManager and
>>>>>> introduced RemoteLogMetadataManager. Remote log metadata store should 
>>>>>> give
>>>>>> strong consistency semantics but remote log storage can be eventually
>>>>>> consistent.
>>>>>> We can have a default implementation for RemoteLogMetadataManager
>>>>>> which uses an internal topic(as mentioned in one of our earlier emails) 
>>>>>> as
>>>>>> storage. But users can always plugin their own RemoteLogMetadataManager
>>>>>> implementation based on their environment.
>>>>>>
>>>>>> Please go through the updated KIP and let us know your comments. We
>>>>>> have started refactoring for the changes mentioned in the KIP and there 
>>>>>> may
>>>>>> be a few more updates to the APIs.
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>>
>>>>>> https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7#
>>>>>>
>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko <
>>>>>> ivan0yurche...@gmail.com>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Jun:
>>>>>>
>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If
>>>>>>
>>>>>> you
>>>>>>
>>>>>> have 100,000 partitions and want to pull the metadata for each
>>>>>>
>>>>>> partition
>>>>>>
>>>>>> at
>>>>>>
>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per
>>>>>>
>>>>>> day.
>>>>>>
>>>>>> I want to note here, that no reasonably durable storage will be cheap
>>>>>> at 100k RPS. For example, DynamoDB might give the same ballpark
>>>>>>
>>>>>> figures.
>>>>>>
>>>>>> If we want to keep the pull-based approach, we can try to reduce this
>>>>>>
>>>>>> number
>>>>>>
>>>>>> in several ways: doing listings less frequently (as Satish mentioned,
>>>>>> with the current defaults it's ~3.33k RPS for your example), batching
>>>>>> listing operations in some way (depending on the storage; it might 
>>>>>> require
>>>>>> the change of RSM's interface).
>>>>>>
>>>>>> There are different ways for doing push based metadata propagation.
>>>>>>
>>>>>> Some
>>>>>>
>>>>>> object stores may support that already. For example, S3 supports
>>>>>>
>>>>>> events
>>>>>>
>>>>>> notification
>>>>>>
>>>>>> This sounds interesting. However, I see a couple of issues using it:
>>>>>> 1. As I understand the documentation, notification delivery is not
>>>>>> guaranteed
>>>>>> and it's recommended to periodically do LIST to fill the gaps. Which
>>>>>> brings us back to the same LIST consistency guarantees issue.
>>>>>> 2. The same goes for the broker start: to get the current state, we
>>>>>>
>>>>>> need
>>>>>>
>>>>>> to LIST.
>>>>>> 3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS
>>>>>>
>>>>>> aren't
>>>>>>
>>>>>> designed for such a case.
>>>>>>
>>>>>> Alexandre:
>>>>>>
>>>>>> A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM
>>>>>>
>>>>>> cannot
>>>>>>
>>>>>> relies solely on S3 APIs to guarantee the expected strong
>>>>>>
>>>>>> consistency. The
>>>>>>
>>>>>> proposed implementation [3] would need to be updated to take this
>>>>>>
>>>>>> into
>>>>>>
>>>>>> account. Let’s talk more about this.
>>>>>>
>>>>>> Thank you for the feedback. I clearly see the need for changing the
>>>>>> S3 implementation
>>>>>> to provide stronger consistency guarantees. As it see from this
>>>>>> thread, there are
>>>>>> several possible approaches to this. Let's discuss RemoteLogManager's
>>>>>> contract and
>>>>>> behavior (like pull vs push model) further before picking one (or
>>>>>>
>>>>>> several -
>>>>>>
>>>>>> ?) of them.
>>>>>> I'm going to do some evaluation of DynamoDB for the pull-based
>>>>>>
>>>>>> approach,
>>>>>>
>>>>>> if it's possible to apply it paying a reasonable bill. Also, of the
>>>>>> push-based approach
>>>>>> with a Kafka topic as the medium.
>>>>>>
>>>>>> A.2.3 Atomicity – what does an implementation of RSM need to provide
>>>>>>
>>>>>> with
>>>>>>
>>>>>> respect to atomicity of the APIs copyLogSegment, cleanupLogUntil and
>>>>>> deleteTopicPartition? If a partial failure happens in any of those
>>>>>>
>>>>>> (e.g.
>>>>>>
>>>>>> in
>>>>>>
>>>>>> the S3 implementation, if one of the multiple uploads fails [4]),
>>>>>>
>>>>>> The S3 implementation is going to change, but it's worth clarifying
>>>>>>
>>>>>> anyway.
>>>>>>
>>>>>> The segment log file is being uploaded after S3 has acked uploading
>>>>>> of all other files associated with the segment and only after this the
>>>>>>
>>>>>> whole
>>>>>>
>>>>>> segment file set becomes visible remotely for operations like
>>>>>> listRemoteSegments [1].
>>>>>> In case of upload failure, the files that has been successfully
>>>>>>
>>>>>> uploaded
>>>>>>
>>>>>> stays
>>>>>> as invisible garbage that is collected by cleanupLogUntil (or
>>>>>>
>>>>>> overwritten
>>>>>>
>>>>>> successfully later).
>>>>>> And the opposite happens during the deletion: log files are deleted
>>>>>>
>>>>>> first.
>>>>>>
>>>>>> This approach should generally work when we solve consistency issues
>>>>>> by adding a strongly consistent storage: a segment's uploaded files
>>>>>>
>>>>>> remain
>>>>>>
>>>>>> invisible garbage until some metadata about them is written.
>>>>>>
>>>>>> A.3 Caching – storing locally the segments retrieved from the remote
>>>>>> storage is excluded as it does not align with the original intent
>>>>>>
>>>>>> and even
>>>>>>
>>>>>> defeat some of its purposes (save disk space etc.). That said, could
>>>>>>
>>>>>> there
>>>>>>
>>>>>> be other types of use cases where the pattern of access to the
>>>>>>
>>>>>> remotely
>>>>>>
>>>>>> stored segments would benefit from local caching (and potentially
>>>>>> read-ahead)? Consider the use case of a large pool of consumers which
>>>>>>
>>>>>> start
>>>>>>
>>>>>> a backfill at the same time for one day worth of data from one year
>>>>>>
>>>>>> ago
>>>>>>
>>>>>> stored remotely. Caching the segments locally would allow to
>>>>>>
>>>>>> uncouple the
>>>>>>
>>>>>> load on the remote storage from the load on the Kafka cluster. Maybe
>>>>>>
>>>>>> the
>>>>>>
>>>>>> RLM could expose a configuration parameter to switch that feature
>>>>>>
>>>>>> on/off?
>>>>>>
>>>>>> I tend to agree here, caching remote segments locally and making this
>>>>>> configurable sounds pretty practical to me. We should implement
>>>>>>
>>>>>> this,
>>>>>>
>>>>>> maybe not in the first iteration.
>>>>>>
>>>>>> Br,
>>>>>> Ivan
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>>
>>>>>> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152
>>>>>>
>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez <
>>>>>>
>>>>>> alexandre.dupr...@gmail.com>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Jun,
>>>>>>
>>>>>> Thank you for the feedback. I am trying to understand how a
>>>>>>
>>>>>> push-based
>>>>>>
>>>>>> approach would work.
>>>>>> In order for the metadata to be propagated (under the assumption you
>>>>>> stated), would you plan to add a new API in Kafka to allow the metadata
>>>>>> store to send them directly to the brokers?
>>>>>>
>>>>>> Thanks,
>>>>>> Alexandre
>>>>>>
>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao <j...@confluent.io> a écrit :
>>>>>>
>>>>>> Hi, Satish and Ivan,
>>>>>>
>>>>>> There are different ways for doing push based metadata
>>>>>>
>>>>>> propagation. Some
>>>>>>
>>>>>> object stores may support that already. For example, S3 supports
>>>>>>
>>>>>> events
>>>>>>
>>>>>> notification (
>>>>>>
>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).
>>>>>>
>>>>>>
>>>>>> Otherwise one could use a separate metadata store that supports
>>>>>>
>>>>>> push-based
>>>>>>
>>>>>> change propagation. Other people have mentioned using a Kafka
>>>>>>
>>>>>> topic. The
>>>>>>
>>>>>> best approach may depend on the object store and the operational
>>>>>> environment (e.g. whether an external metadata store is already
>>>>>>
>>>>>> available).
>>>>>>
>>>>>> The above discussion is based on the assumption that we need to
>>>>>>
>>>>>> cache the
>>>>>>
>>>>>> object metadata locally in every broker. I mentioned earlier that
>>>>>>
>>>>>> an
>>>>>>
>>>>>> alternative is to just store/retrieve those metadata in an external
>>>>>> metadata store. That may simplify the implementation in some cases.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana <
>>>>>>
>>>>>> satish.dugg...@gmail.com>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Jun,
>>>>>> Thanks for your reply.
>>>>>>
>>>>>> Currently, `listRemoteSegments` is called at the configured
>>>>>> interval(not every second, defaults to 30secs). Storing remote
>>>>>>
>>>>>> log
>>>>>>
>>>>>> metadata in a strongly consistent store for S3 RSM is raised in
>>>>>> PR-comment[1].
>>>>>> RLM invokes RSM at regular intervals and RSM can give remote
>>>>>>
>>>>>> segment
>>>>>>
>>>>>> metadata if it is available. RSM is responsible for maintaining
>>>>>>
>>>>>> and
>>>>>>
>>>>>> fetching those entries. It should be based on whatever mechanism
>>>>>>
>>>>>> is
>>>>>>
>>>>>> consistent and efficient with the respective remote storage.
>>>>>>
>>>>>> Can you give more details about push based mechanism from RSM?
>>>>>>
>>>>>> 1.
>>>>>>
>>>>>> https://github.com/apache/kafka/pull/7561#discussion_r344576223
>>>>>>
>>>>>> Thanks,
>>>>>> Satish.
>>>>>>
>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <j...@confluent.io> wrote:
>>>>>>
>>>>>> Hi, Harsha,
>>>>>>
>>>>>> Thanks for the reply.
>>>>>>
>>>>>> 40/41. I am curious which block storages you have tested. S3
>>>>>>
>>>>>> seems
>>>>>>
>>>>>> to be
>>>>>>
>>>>>> one of the popular block stores. The concerns that I have with
>>>>>>
>>>>>> pull
>>>>>>
>>>>>> based
>>>>>>
>>>>>> approach are the following.
>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000
>>>>>>
>>>>>> requests. If
>>>>>>
>>>>>> you
>>>>>>
>>>>>> have 100,000 partitions and want to pull the metadata for each
>>>>>>
>>>>>> partition
>>>>>>
>>>>>> at
>>>>>>
>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K
>>>>>>
>>>>>> per
>>>>>>
>>>>>> day.
>>>>>>
>>>>>> (b) Semantics: S3 list objects are eventually consistent. So,
>>>>>>
>>>>>> when
>>>>>>
>>>>>> you
>>>>>>
>>>>>> do a
>>>>>>
>>>>>> list object request, there is no guarantee that you can see all
>>>>>>
>>>>>> uploaded
>>>>>>
>>>>>> objects. This could impact the correctness of subsequent
>>>>>>
>>>>>> logics.
>>>>>>
>>>>>> (c) Efficiency: Blindly pulling metadata when there is no
>>>>>>
>>>>>> change adds
>>>>>>
>>>>>> unnecessary overhead in the broker as well as in the block
>>>>>>
>>>>>> store.
>>>>>>
>>>>>> So, have you guys tested S3? If so, could you share your
>>>>>>
>>>>>> experience
>>>>>>
>>>>>> in
>>>>>>
>>>>>> terms of cost, semantics and efficiency?
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani <
>>>>>>
>>>>>> ka...@harsha.io
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Jun,
>>>>>> Thanks for the reply.
>>>>>>
>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <j...@confluent.io>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi, Satish and Ying,
>>>>>>
>>>>>> Thanks for the reply.
>>>>>>
>>>>>> 40/41. There are two different ways that we can approach
>>>>>>
>>>>>> this.
>>>>>>
>>>>>> One is
>>>>>>
>>>>>> what
>>>>>>
>>>>>> you said. We can have an opinionated way of storing and
>>>>>>
>>>>>> populating
>>>>>>
>>>>>> the
>>>>>>
>>>>>> tier
>>>>>>
>>>>>> metadata that we think is good enough for everyone. I am
>>>>>>
>>>>>> not
>>>>>>
>>>>>> sure if
>>>>>>
>>>>>> this
>>>>>>
>>>>>> is the case based on what's currently proposed in the KIP.
>>>>>>
>>>>>> For
>>>>>>
>>>>>> example, I
>>>>>>
>>>>>> am not sure that (1) everyone always needs local metadata;
>>>>>>
>>>>>> (2)
>>>>>>
>>>>>> the
>>>>>>
>>>>>> current
>>>>>>
>>>>>> local storage format is general enough and (3) everyone
>>>>>>
>>>>>> wants to
>>>>>>
>>>>>> use
>>>>>>
>>>>>> the
>>>>>>
>>>>>> pull based approach to propagate the metadata. Another
>>>>>>
>>>>>> approach
>>>>>>
>>>>>> is to
>>>>>>
>>>>>> make
>>>>>>
>>>>>> this pluggable and let the implementor implements the best
>>>>>>
>>>>>> approach
>>>>>>
>>>>>> for a
>>>>>>
>>>>>> particular block storage. I haven't seen any comments from
>>>>>>
>>>>>> Slack/AirBnb
>>>>>>
>>>>>> in
>>>>>>
>>>>>> the mailing list on this topic. It would be great if they
>>>>>>
>>>>>> can
>>>>>>
>>>>>> provide
>>>>>>
>>>>>> feedback directly here.
>>>>>>
>>>>>> The current interfaces are designed with most popular block
>>>>>>
>>>>>> storages
>>>>>>
>>>>>> available today and we did 2 implementations with these
>>>>>>
>>>>>> interfaces and
>>>>>>
>>>>>> they both are yielding good results as we going through the
>>>>>>
>>>>>> testing of
>>>>>>
>>>>>> it.
>>>>>>
>>>>>> If there is ever a need for pull based approach we can
>>>>>>
>>>>>> definitely
>>>>>>
>>>>>> evolve
>>>>>>
>>>>>> the interface.
>>>>>> In the past we did mark interfaces to be evolving to make
>>>>>>
>>>>>> room for
>>>>>>
>>>>>> unknowns
>>>>>>
>>>>>> in the future.
>>>>>> If you have any suggestions around the current interfaces
>>>>>>
>>>>>> please
>>>>>>
>>>>>> propose we
>>>>>>
>>>>>> are happy to see if we can work them into it.
>>>>>>
>>>>>> 43. To offer tier storage as a general feature, ideally all
>>>>>>
>>>>>> existing
>>>>>>
>>>>>> capabilities should still be supported. It's fine if the
>>>>>>
>>>>>> uber
>>>>>>
>>>>>> implementation doesn't support all capabilities for
>>>>>>
>>>>>> internal
>>>>>>
>>>>>> usage.
>>>>>>
>>>>>> However, the framework should be general enough.
>>>>>>
>>>>>> We agree on that as a principle. But all of these major
>>>>>>
>>>>>> features
>>>>>>
>>>>>> mostly
>>>>>>
>>>>>> coming right now and to have a new big feature such as tiered
>>>>>>
>>>>>> storage
>>>>>>
>>>>>> to
>>>>>>
>>>>>> support all the new features will be a big ask. We can
>>>>>>
>>>>>> document on
>>>>>>
>>>>>> how
>>>>>>
>>>>>> do
>>>>>>
>>>>>> we approach solving these in future iterations.
>>>>>> Our goal is to make this tiered storage feature work for
>>>>>>
>>>>>> everyone.
>>>>>>
>>>>>> 43.3 This is more than just serving the tier-ed data from
>>>>>>
>>>>>> block
>>>>>>
>>>>>> storage.
>>>>>>
>>>>>> With KIP-392, the consumer now can resolve the conflicts
>>>>>>
>>>>>> with the
>>>>>>
>>>>>> replica
>>>>>>
>>>>>> based on leader epoch. So, we need to make sure that
>>>>>>
>>>>>> leader epoch
>>>>>>
>>>>>> can be
>>>>>>
>>>>>> recovered properly from tier storage.
>>>>>>
>>>>>> We are working on testing our approach and we will update
>>>>>>
>>>>>> the KIP
>>>>>>
>>>>>> with
>>>>>>
>>>>>> design details.
>>>>>>
>>>>>> 43.4 For JBOD, if tier storage stores the tier metadata
>>>>>>
>>>>>> locally, we
>>>>>>
>>>>>> need to
>>>>>>
>>>>>> support moving such metadata across disk directories since
>>>>>>
>>>>>> JBOD
>>>>>>
>>>>>> supports
>>>>>>
>>>>>> moving data across disks.
>>>>>>
>>>>>> KIP is updated with JBOD details. Having said that JBOD
>>>>>>
>>>>>> tooling
>>>>>>
>>>>>> needs
>>>>>>
>>>>>> to
>>>>>>
>>>>>> evolve to support production loads. Most of the users will be
>>>>>>
>>>>>> interested in
>>>>>>
>>>>>> using tiered storage without JBOD support support on day 1.
>>>>>>
>>>>>> Thanks,
>>>>>> Harsha
>>>>>>
>>>>>> As for meeting, we could have a KIP e-meeting on this if
>>>>>>
>>>>>> needed,
>>>>>>
>>>>>> but it
>>>>>>
>>>>>> will be open to everyone and will be recorded and shared.
>>>>>>
>>>>>> Often,
>>>>>>
>>>>>> the
>>>>>>
>>>>>> details are still resolved through the mailing list.
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng
>>>>>>
>>>>>> <yi...@uber.com.invalid>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Please ignore my previous email
>>>>>> I didn't know Apache requires all the discussions to be
>>>>>>
>>>>>> "open"
>>>>>>
>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <yi...@uber.com>
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Jun,
>>>>>>
>>>>>> Thank you very much for your feedback!
>>>>>>
>>>>>> Can we schedule a meeting in your Palo Alto office in
>>>>>>
>>>>>> December? I
>>>>>>
>>>>>> think a
>>>>>>
>>>>>> face to face discussion is much more efficient than
>>>>>>
>>>>>> emails. Both
>>>>>>
>>>>>> Harsha
>>>>>>
>>>>>> and
>>>>>>
>>>>>> I can visit you. Satish may be able to join us remotely.
>>>>>>
>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <j...@confluent.io
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Hi, Satish and Harsha,
>>>>>>
>>>>>> The following is a more detailed high level feedback for
>>>>>>
>>>>>> the KIP.
>>>>>>
>>>>>> Overall,
>>>>>>
>>>>>> the KIP seems useful. The challenge is how to design it
>>>>>>
>>>>>> such that
>>>>>>
>>>>>> it’s
>>>>>>
>>>>>> general enough to support different ways of implementing
>>>>>>
>>>>>> this
>>>>>>
>>>>>> feature
>>>>>>
>>>>>> and
>>>>>>
>>>>>> support existing features.
>>>>>>
>>>>>> 40. Local segment metadata storage: The KIP makes the
>>>>>>
>>>>>> assumption
>>>>>>
>>>>>> that
>>>>>>
>>>>>> the
>>>>>>
>>>>>> metadata for the archived log segments are cached locally
>>>>>>
>>>>>> in
>>>>>>
>>>>>> every
>>>>>>
>>>>>> broker
>>>>>>
>>>>>> and provides a specific implementation for the local
>>>>>>
>>>>>> storage in
>>>>>>
>>>>>> the
>>>>>>
>>>>>> framework. We probably should discuss this more. For
>>>>>>
>>>>>> example,
>>>>>>
>>>>>> some
>>>>>>
>>>>>> tier
>>>>>>
>>>>>> storage providers may not want to cache the metadata
>>>>>>
>>>>>> locally and
>>>>>>
>>>>>> just
>>>>>>
>>>>>> rely
>>>>>>
>>>>>> upon a remote key/value store if such a store is already
>>>>>>
>>>>>> present. If
>>>>>>
>>>>>> a
>>>>>>
>>>>>> local store is used, there could be different ways of
>>>>>>
>>>>>> implementing it
>>>>>>
>>>>>> (e.g., based on customized local files, an embedded local
>>>>>>
>>>>>> store
>>>>>>
>>>>>> like
>>>>>>
>>>>>> RocksDB, etc). An alternative of designing this is to just
>>>>>>
>>>>>> provide an
>>>>>>
>>>>>> interface for retrieving the tier segment metadata and
>>>>>>
>>>>>> leave the
>>>>>>
>>>>>> details
>>>>>>
>>>>>> of
>>>>>>
>>>>>> how to get the metadata outside of the framework.
>>>>>>
>>>>>> 41. RemoteStorageManager interface and the usage of the
>>>>>>
>>>>>> interface in
>>>>>>
>>>>>> the
>>>>>>
>>>>>> framework: I am not sure if the interface is general
>>>>>>
>>>>>> enough. For
>>>>>>
>>>>>> example,
>>>>>>
>>>>>> it seems that RemoteLogIndexEntry is tied to a specific
>>>>>>
>>>>>> way of
>>>>>>
>>>>>> storing
>>>>>>
>>>>>> the
>>>>>>
>>>>>> metadata in remote storage. The framework uses
>>>>>>
>>>>>> listRemoteSegments()
>>>>>>
>>>>>> api
>>>>>>
>>>>>> in
>>>>>>
>>>>>> a pull based approach. However, in some other
>>>>>>
>>>>>> implementations, a
>>>>>>
>>>>>> push
>>>>>>
>>>>>> based
>>>>>> approach may be more preferred. I don’t have a concrete
>>>>>>
>>>>>> proposal
>>>>>>
>>>>>> yet.
>>>>>>
>>>>>> But,
>>>>>>
>>>>>> it would be useful to give this area some more thoughts
>>>>>>
>>>>>> and see
>>>>>>
>>>>>> if we
>>>>>>
>>>>>> can
>>>>>>
>>>>>> make the interface more general.
>>>>>>
>>>>>> 42. In the diagram, the RemoteLogManager is side by side
>>>>>>
>>>>>> with
>>>>>>
>>>>>> LogManager.
>>>>>>
>>>>>> This KIP only discussed how the fetch request is handled
>>>>>>
>>>>>> between
>>>>>>
>>>>>> the
>>>>>>
>>>>>> two
>>>>>>
>>>>>> layer. However, we should also consider how other requests
>>>>>>
>>>>>> that
>>>>>>
>>>>>> touch
>>>>>>
>>>>>> the
>>>>>>
>>>>>> log can be handled. e.g., list offsets by timestamp, delete
>>>>>>
>>>>>> records,
>>>>>>
>>>>>> etc.
>>>>>>
>>>>>> Also, in this model, it's not clear which component is
>>>>>>
>>>>>> responsible
>>>>>>
>>>>>> for
>>>>>>
>>>>>> managing the log start offset. It seems that the log start
>>>>>>
>>>>>> offset
>>>>>>
>>>>>> could
>>>>>>
>>>>>> be
>>>>>>
>>>>>> changed by both RemoteLogManager and LogManager.
>>>>>>
>>>>>> 43. There are quite a few existing features not covered by
>>>>>>
>>>>>> the
>>>>>>
>>>>>> KIP.
>>>>>>
>>>>>> It
>>>>>>
>>>>>> would be useful to discuss each of those.
>>>>>> 43.1 I won’t say that compacted topics are rarely used and
>>>>>>
>>>>>> always
>>>>>>
>>>>>> small.
>>>>>>
>>>>>> For example, KStreams uses compacted topics for storing the
>>>>>>
>>>>>> states
>>>>>>
>>>>>> and
>>>>>>
>>>>>> sometimes the size of the topic could be large. While it
>>>>>>
>>>>>> might
>>>>>>
>>>>>> be ok
>>>>>>
>>>>>> to
>>>>>>
>>>>>> not
>>>>>>
>>>>>> support compacted topics initially, it would be useful to
>>>>>>
>>>>>> have a
>>>>>>
>>>>>> high
>>>>>>
>>>>>> level
>>>>>> idea on how this might be supported down the road so that
>>>>>>
>>>>>> we
>>>>>>
>>>>>> don’t
>>>>>>
>>>>>> have
>>>>>>
>>>>>> to
>>>>>>
>>>>>> make incompatible API changes in the future.
>>>>>> 43.2 We need to discuss how EOS is supported. In
>>>>>>
>>>>>> particular, how
>>>>>>
>>>>>> is
>>>>>>
>>>>>> the
>>>>>>
>>>>>> producer state integrated with the remote storage. 43.3
>>>>>>
>>>>>> Now that
>>>>>>
>>>>>> KIP-392
>>>>>>
>>>>>> (allow consumers to fetch from closest replica) is
>>>>>>
>>>>>> implemented,
>>>>>>
>>>>>> we
>>>>>>
>>>>>> need
>>>>>>
>>>>>> to
>>>>>>
>>>>>> discuss how reading from a follower replica is supported
>>>>>>
>>>>>> with
>>>>>>
>>>>>> tier
>>>>>>
>>>>>> storage.
>>>>>>
>>>>>> 43.4 We need to discuss how JBOD is supported with tier
>>>>>>
>>>>>> storage.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jun
>>>>>>
>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley <
>>>>>>
>>>>>> tbent...@redhat.com
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks for those insights Ying.
>>>>>>
>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng
>>>>>>
>>>>>> <yi...@uber.com.invalid
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks, I missed that point. However, there's still a
>>>>>>
>>>>>> point at
>>>>>>
>>>>>> which
>>>>>>
>>>>>> the
>>>>>>
>>>>>> consumer fetches start getting served from remote storage
>>>>>>
>>>>>> (even
>>>>>>
>>>>>> if
>>>>>>
>>>>>> that
>>>>>>
>>>>>> point isn't as soon as the local log retention time/size).
>>>>>>
>>>>>> This
>>>>>>
>>>>>> represents
>>>>>>
>>>>>> a kind of performance cliff edge and what I'm really
>>>>>>
>>>>>> interested
>>>>>>
>>>>>> in
>>>>>>
>>>>>> is
>>>>>>
>>>>>> how
>>>>>>
>>>>>> easy it is for a consumer which falls off that cliff to
>>>>>>
>>>>>> catch up
>>>>>>
>>>>>> and so
>>>>>>
>>>>>> its
>>>>>>
>>>>>> fetches again come from local storage. Obviously this can
>>>>>>
>>>>>> depend
>>>>>>
>>>>>> on
>>>>>>
>>>>>> all
>>>>>>
>>>>>> sorts of factors (like production rate, consumption rate),
>>>>>>
>>>>>> so
>>>>>>
>>>>>> it's
>>>>>>
>>>>>> not
>>>>>>
>>>>>> guaranteed (just like it's not guaranteed for Kafka
>>>>>>
>>>>>> today), but
>>>>>>
>>>>>> this
>>>>>>
>>>>>> would
>>>>>>
>>>>>> represent a new failure mode.
>>>>>>
>>>>>> As I have explained in the last mail, it's a very rare
>>>>>>
>>>>>> case that
>>>>>>
>>>>>> a
>>>>>>
>>>>>> consumer
>>>>>> need to read remote data. With our experience at Uber,
>>>>>>
>>>>>> this only
>>>>>>
>>>>>> happens
>>>>>>
>>>>>> when the consumer service had an outage for several hours.
>>>>>>
>>>>>> There is not a "performance cliff" as you assume. The
>>>>>>
>>>>>> remote
>>>>>>
>>>>>> storage
>>>>>>
>>>>>> is
>>>>>>
>>>>>> even faster than local disks in terms of bandwidth.
>>>>>>
>>>>>> Reading from
>>>>>>
>>>>>> remote
>>>>>>
>>>>>> storage is going to have higher latency than local disk.
>>>>>>
>>>>>> But
>>>>>>
>>>>>> since
>>>>>>
>>>>>> the
>>>>>>
>>>>>> consumer
>>>>>> is catching up several hours data, it's not sensitive to
>>>>>>
>>>>>> the
>>>>>>
>>>>>> sub-second
>>>>>>
>>>>>> level
>>>>>> latency, and each remote read request will read a large
>>>>>>
>>>>>> amount of
>>>>>>
>>>>>> data to
>>>>>>
>>>>>> make the overall performance better than reading from local
>>>>>>
>>>>>> disks.
>>>>>>
>>>>>> Another aspect I'd like to understand better is the effect
>>>>>>
>>>>>> of
>>>>>>
>>>>>> serving
>>>>>>
>>>>>> fetch
>>>>>>
>>>>>> request from remote storage has on the broker's network
>>>>>>
>>>>>> utilization. If
>>>>>>
>>>>>> we're just trimming the amount of data held locally
>>>>>>
>>>>>> (without
>>>>>>
>>>>>> increasing
>>>>>>
>>>>>> the
>>>>>>
>>>>>> overall local+remote retention), then we're effectively
>>>>>>
>>>>>> trading
>>>>>>
>>>>>> disk
>>>>>>
>>>>>> bandwidth for network bandwidth when serving fetch
>>>>>>
>>>>>> requests from
>>>>>>
>>>>>> remote
>>>>>>
>>>>>> storage (which I understand to be a good thing, since
>>>>>>
>>>>>> brokers are
>>>>>>
>>>>>> often/usually disk bound). But if we're increasing the
>>>>>>
>>>>>> overall
>>>>>>
>>>>>> local+remote
>>>>>>
>>>>>> retention then it's more likely that network itself
>>>>>>
>>>>>> becomes the
>>>>>>
>>>>>> bottleneck.
>>>>>>
>>>>>> I appreciate this is all rather hand wavy, I'm just trying
>>>>>>
>>>>>> to
>>>>>>
>>>>>> understand
>>>>>>
>>>>>> how this would affect broker performance, so I'd be
>>>>>>
>>>>>> grateful for
>>>>>>
>>>>>> any
>>>>>>
>>>>>> insights you can offer.
>>>>>>
>>>>>> Network bandwidth is a function of produce speed, it has
>>>>>>
>>>>>> nothing
>>>>>>
>>>>>> to
>>>>>>
>>>>>> do
>>>>>>
>>>>>> with
>>>>>>
>>>>>> remote retention. As long as the data is shipped to remote
>>>>>>
>>>>>> storage,
>>>>>>
>>>>>> you
>>>>>>
>>>>>> can
>>>>>>
>>>>>> keep the data there for 1 day or 1 year or 100 years, it
>>>>>>
>>>>>> doesn't
>>>>>>
>>>>>> consume
>>>>>>
>>>>>> any
>>>>>> network resources.
>>>>>>
>>>>>>
>>>>>

Reply via email to