Thanks everyone for attending the meeting today.
Here is the recording
https://drive.google.com/file/d/14PRM7U0OopOOrJR197VlqvRX5SXNtmKj/view?usp=sharing

Notes:

   1. KIP is updated with follower fetch protocol and ready to reviewed
   2. Satish to capture schema of internal metadata topic in the KIP
   3. We will update the KIP with details of different cases
   4. Test plan will be captured in a doc and will add to the KIP
   5. Add a section "Limitations" to capture the capabilities that will be
   introduced with this KIP and what will not be covered in this KIP.

Please add to it I missed anything. Will produce a formal meeting notes
from next meeting onwards.

Thanks,
Harsha



On Mon, Aug 24, 2020 at 9:42 PM, Ying Zheng <yi...@uber.com.invalid> wrote:

> We did some basic feature tests at Uber. The test cases and results are
> shared in this google doc:
> https://docs.google.com/spreadsheets/d/
> 1XhNJqjzwXvMCcAOhEH0sSXU6RTvyoSf93DHF-YMfGLk/edit?usp=sharing
>
> The performance test results were already shared in the KIP last month.
>
> On Mon, Aug 24, 2020 at 11:10 AM Harsha Ch <harsha...@gmail.com> wrote:
>
> "Understand commitments towards driving design & implementation of the KIP
> further and how it aligns with participant interests in contributing to the
> efforts (ex: in the context of Uber’s Q3/Q4 roadmap)." What is that about?
>
> On Mon, Aug 24, 2020 at 11:05 AM Kowshik Prakasam <kpraka...@confluent.io>
> wrote:
>
> Hi Harsha,
>
> The following google doc contains a proposal for temporary agenda for the
> KIP-405 <https://issues.apache.org/jira/browse/KIP-405> sync meeting
> tomorrow:
>
> https://docs.google.com/document/d/
> 1pqo8X5LU8TpwfC_iqSuVPezhfCfhGkbGN2TqiPA3LBU/edit
>
> .
> Please could you add it to the Google calendar invite?
>
> Thank you.
>
> Cheers,
> Kowshik
>
> On Thu, Aug 20, 2020 at 10:58 AM Harsha Ch <harsha...@gmail.com> wrote:
>
> Hi All,
>
> Scheduled a meeting for Tuesday 9am - 10am. I can record and upload for
> community to be able to follow the discussion.
>
> Jun, please add the required folks on confluent side.
>
> Thanks,
>
> Harsha
>
> On Thu, Aug 20, 2020 at 12:33 AM, Alexandre Dupriez < alexandre.dupriez@
> gmail.com > wrote:
>
> Hi Jun,
>
> Many thanks for your initiative.
>
> If you like, I am happy to attend at the time you suggested.
>
> Many thanks,
> Alexandre
>
> Le mer. 19 août 2020 à 22:00, Harsha Ch < harsha. ch@ gmail. com ( harsha.
> c...@gmail.com ) > a écrit :
>
> Hi Jun,
> Thanks. This will help a lot. Tuesday will work for us.
> -Harsha
>
> On Wed, Aug 19, 2020 at 1:24 PM Jun Rao < jun@ confluent. io ( jun@
> confluent.io ) > wrote:
>
> Hi, Satish, Ying, Harsha,
>
> Do you think it would be useful to have a regular virtual meeting to
> discuss this KIP? The goal of the meeting will be sharing
> design/development progress and discussing any open issues to
>
> accelerate
>
> this KIP. If so, will every Tuesday (from next week) 9am-10am
>
> PT
>
> work for you? I can help set up a Zoom meeting, invite everyone who
>
> might
>
> be interested, have it recorded and shared, etc.
>
> Thanks,
>
> Jun
>
> On Tue, Aug 18, 2020 at 11:01 AM Satish Duggana <
>
> satish. duggana@ gmail. com ( satish.dugg...@gmail.com ) >
>
> wrote:
>
> Hi Kowshik,
>
> Thanks for looking into the KIP and sending your comments.
>
> 5001. Under the section "Follower fetch protocol in detail", the
> next-local-offset is the offset upto which the segments are copied
>
> to
>
> remote storage. Instead, would last-tiered-offset be a better name
>
> than
>
> next-local-offset? last-tiered-offset seems to naturally align well
>
> with
>
> the definition provided in the KIP.
>
> Both next-local-offset and local-log-start-offset were introduced
>
> to
>
> talk
>
> about offsets related to local log. We are fine with
>
> last-tiered-offset
>
> too as you suggested.
>
> 5002. After leadership is established for a partition, the leader
>
> would
>
> begin uploading a segment to remote storage. If successful, the
>
> leader
>
> would write the updated RemoteLogSegmentMetadata to the metadata
>
> topic
>
> (via
>
> RLMM.putRemoteLogSegmentData). However, for defensive reasons, it
>
> seems
>
> useful that before the first time the segment is uploaded by the
>
> leader
>
> for
>
> a partition, the leader should ensure to catch up to all the
>
> metadata
>
> events written so far in the metadata topic for that partition (ex:
>
> by
>
> previous leader). To achieve this, the leader could start a lease
>
> (using
>
> an
>
> establish_leader metadata event) before commencing tiering, and
>
> wait
>
> until
>
> the event is read back. For example, this seems useful to avoid
>
> cases
>
> where
>
> zombie leaders can be active for the same partition. This can also
>
> prove
>
> useful to help avoid making decisions on which segments to be
>
> uploaded
>
> for
>
> a partition, until the current leader has caught up to a complete
>
> view
>
> of
>
> all segments uploaded for the partition so far (otherwise this may
>
> cause
>
> same segment being uploaded twice -- once by the previous leader
>
> and
>
> then
>
> by the new leader).
>
> We allow copying segments to remote storage which may have common
>
> offsets.
>
> Please go through the KIP to understand the follower fetch
>
> protocol(1) and
>
> follower to leader transition(2).
>
> https:/ / cwiki. apache. org/ confluence/ display/ KAFKA/ KIP-405
>
> <https://issues.apache.org/jira/browse/KIP-405>
> %3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
>
> (
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
>
> )
>
> https:/ / cwiki. apache. org/ confluence/ display/ KAFKA/ KIP-405
>
> <https://issues.apache.org/jira/browse/KIP-405>
>
> %3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
>
>
> (
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
>
> )
>
> 5003. There is a natural interleaving between uploading a segment
>
> to
>
> remote
>
> store, and, writing a metadata event for the same (via
> RLMM.putRemoteLogSegmentData). There can be cases where a remote
>
> segment
>
> is
>
> uploaded, then the leader fails and a corresponding metadata event
>
> never
>
> gets written. In such cases, the orphaned remote segment has to be
> eventually deleted (since there is no confirmation of the upload).
>
> To
>
> handle this, we could use 2 separate metadata events viz.
>
> copy_initiated
>
> and copy_completed, so that copy_initiated events that don't have a
> corresponding copy_completed event can be treated as garbage and
>
> deleted
>
> from the remote object store by the broker.
>
> We are already updating RMM with RemoteLogSegmentMetadata pre and
>
> post
>
> copying of log segments. We had a flag in RemoteLogSegmentMetadata
>
> whether
>
> it is copied or not. But we are making changes in
>
> RemoteLogSegmentMetadata
>
> to introduce a state field in RemoteLogSegmentMetadata which will
>
> have the
>
> respective started and finished states. This includes for other
>
> operations
>
> like delete too.
>
> 5004. In the default implementation of RLMM (using the internal
>
> topic
>
> __remote_log_metadata), a separate topic called
> __remote_segments_to_be_deleted is going to be used just to track
>
> failures
>
> in removing remote log segments. A separate topic (effectively
>
> another
>
> metadata stream) introduces some maintenance overhead and design
> complexity. It seems to me that the same can be achieved just by
>
> using
>
> just
>
> the __remote_log_metadata topic with the following steps: 1) the
>
> leader
>
> writes a delete_initiated metadata event, 2) the leader deletes the
>
> segment
>
> and 3) the leader writes a delete_completed metadata event. Tiered
>
> segments
>
> that have delete_initiated message and not delete_completed
>
> message,
>
> can
>
> be
>
> considered to be a failure and retried.
>
> Jun suggested in earlier mail to keep this simple . We decided not
>
> to have
>
> this topic as mentioned in our earlier replies, updated the KIP.
>
> As I
>
> mentioned in an earlier comment, we are adding state entries for
>
> delete
>
> operations too.
>
> 5005. When a Kafka cluster is provisioned for the first time with
>
> KIP-405 <https://issues.apache.org/jira/browse/KIP-405>
>
> tiered storage enabled, could you explain in the KIP about how the
> bootstrap for __remote_log_metadata topic will be performed in the
>
> the
>
> default RLMM implementation?
>
> __remote_log_segment_metadata topic is created by default with the
> respective topic like partitions/replication-factor etc. Can you be
>
> more
>
> specific on what you are looking for?
>
> 5008. The system-wide configuration ' remote. log. storage. enable
>
> (
>
> http://remote.log.storage.enable/ ) ' is used
>
> to
>
> enable tiered storage. Can this be made a topic-level
>
> configuration,
>
> so
>
> that the user can enable/disable tiered storage at a topic level
>
> rather
>
> than a system-wide default for an entire Kafka cluster?
>
> Yes, we mentioned in an earlier mail thread that it will be
>
> supported at
>
> topic level too, updated the KIP.
>
> 5009. Whenever a topic with tiered storage enabled is deleted, the
> underlying actions require the topic data to be deleted in local
>
> store
>
> as
>
> well as remote store, and eventually the topic metadata needs to be
>
> deleted
>
> too. What is the role of the controller in deleting a topic and
>
> it's
>
> contents, while the topic has tiered storage enabled?
>
> When a topic partition is deleted, there will be an event for that
>
> in RLMM
>
> for its deletion and the controller considers that topic is deleted
>
> only
>
> when all the remote log segments are also deleted.
>
> 5010. RLMM APIs are currently synchronous, for example
> RLMM.putRemoteLogSegmentData waits until the put operation is
>
> completed
>
> in
>
> the remote metadata store. It may also block until the leader has
>
> caught
>
> up
>
> to the metadata (not sure). Could we make these apis asynchronous
>
> (ex:
>
> based on java.util.concurrent.Future) to provide room for tapping
> performance improvements such as non-blocking i/o? 5011. The same
>
> question
>
> as 5009 on sync vs async api for RSM. Have we considered the
>
> pros/cons of
>
> making the RSM apis asynchronous?
>
> Async methods are used to do other tasks while the result is not
> available. In this case, we need to have the result before
>
> proceeding to
>
> take next actions. These APIs are evolving and these can be updated
>
> as and
>
> when needed instead of having them as asynchronous now.
>
> Thanks,
> Satish.
>
> On Fri, Aug 14, 2020 at 4:30 AM Kowshik Prakasam <
>
> kprakasam@ confluent. io ( kpraka...@confluent.io )
>
> wrote:
>
> Hi Harsha/Satish,
>
> Thanks for the great KIP. Below are the first set of
>
> questions/suggestions
>
> I had after making a pass on the KIP.
>
> 5001. Under the section "Follower fetch protocol in detail", the
> next-local-offset is the offset upto which the segments are copied
>
> to
>
> remote storage. Instead, would last-tiered-offset be a better name
>
> than
>
> next-local-offset? last-tiered-offset seems to naturally align
>
> well
>
> with
>
> the definition provided in the KIP.
>
> 5002. After leadership is established for a partition, the leader
>
> would
>
> begin uploading a segment to remote storage. If successful, the
>
> leader
>
> would write the updated RemoteLogSegmentMetadata to the metadata
>
> topic
>
> (via
>
> RLMM.putRemoteLogSegmentData). However, for defensive reasons, it
>
> seems
>
> useful that before the first time the segment is uploaded by the
>
> leader
>
> for
>
> a partition, the leader should ensure to catch up to all the
>
> metadata
>
> events written so far in the metadata topic for that partition
>
> (ex:
>
> by
>
> previous leader). To achieve this, the leader could start a lease
>
> (using
>
> an
>
> establish_leader metadata event) before commencing tiering, and
>
> wait
>
> until
>
> the event is read back. For example, this seems useful to avoid
>
> cases
>
> where
>
> zombie leaders can be active for the same partition. This can also
>
> prove
>
> useful to help avoid making decisions on which segments to be
>
> uploaded
>
> for
>
> a partition, until the current leader has caught up to a complete
>
> view
>
> of
>
> all segments uploaded for the partition so far (otherwise this may
>
> cause
>
> same segment being uploaded twice -- once by the previous leader
>
> and
>
> then
>
> by the new leader).
>
> 5003. There is a natural interleaving between uploading a segment
>
> to
>
> remote
>
> store, and, writing a metadata event for the same (via
> RLMM.putRemoteLogSegmentData). There can be cases where a remote
>
> segment
>
> is
>
> uploaded, then the leader fails and a corresponding metadata event
>
> never
>
> gets written. In such cases, the orphaned remote segment has to be
> eventually deleted (since there is no confirmation of the upload).
>
> To
>
> handle this, we could use 2 separate metadata events viz.
>
> copy_initiated
>
> and copy_completed, so that copy_initiated events that don't have
>
> a
>
> corresponding copy_completed event can be treated as garbage and
>
> deleted
>
> from the remote object store by the broker.
>
> 5004. In the default implementation of RLMM (using the internal
>
> topic
>
> __remote_log_metadata), a separate topic called
> __remote_segments_to_be_deleted is going to be used just to track
>
> failures
>
> in removing remote log segments. A separate topic (effectively
>
> another
>
> metadata stream) introduces some maintenance overhead and design
> complexity. It seems to me that the same can be achieved just by
>
> using
>
> just
>
> the __remote_log_metadata topic with the following steps: 1) the
>
> leader
>
> writes a delete_initiated metadata event, 2) the leader deletes
>
> the
>
> segment
>
> and 3) the leader writes a delete_completed metadata event. Tiered
>
> segments
>
> that have delete_initiated message and not delete_completed
>
> message,
>
> can
>
> be
>
> considered to be a failure and retried.
>
> 5005. When a Kafka cluster is provisioned for the first time with
>
> KIP-405 <https://issues.apache.org/jira/browse/KIP-405>
>
> tiered storage enabled, could you explain in the KIP about how the
> bootstrap for __remote_log_metadata topic will be performed in the
>
> the
>
> default RLMM implementation?
>
> 5006. I currently do not see details on the KIP on why RocksDB was
>
> chosen
>
> as the default cache implementation, and how it is going to be
>
> used.
>
> Were
>
> alternatives compared/considered? For example, it would be useful
>
> to
>
> explain/evaulate the following: 1) debuggability of the RocksDB
>
> JNI
>
> interface, 2) performance, 3) portability across platforms and 4)
>
> interface
>
> parity of RocksDB’s JNI api with it's underlying C/C++ api.
>
> 5007. For the RocksDB cache (the default implementation of RLMM),
>
> what
>
> is
>
> the relationship/mapping between the following: 1) # of tiered
>
> partitions,
>
> 2) # of partitions of metadata topic __remote_log_metadata and 3)
>
> #
>
> of
>
> RocksDB instances? i.e. is the plan to have a RocksDB instance per
>
> tiered
>
> partition, or per metadata topic partition, or just 1 for per
>
> broker?
>
> 5008. The system-wide configuration ' remote. log. storage.
>
> enable (
>
> http://remote.log.storage.enable/ ) ' is
>
> used
>
> to
>
> enable tiered storage. Can this be made a topic-level
>
> configuration,
>
> so
>
> that the user can enable/disable tiered storage at a topic level
>
> rather
>
> than a system-wide default for an entire Kafka cluster?
>
> 5009. Whenever a topic with tiered storage enabled is deleted, the
> underlying actions require the topic data to be deleted in local
>
> store
>
> as
>
> well as remote store, and eventually the topic metadata needs to
>
> be
>
> deleted
>
> too. What is the role of the controller in deleting a topic and
>
> it's
>
> contents, while the topic has tiered storage enabled?
>
> 5010. RLMM APIs are currently synchronous, for example
> RLMM.putRemoteLogSegmentData waits until the put operation is
>
> completed
>
> in
>
> the remote metadata store. It may also block until the leader has
>
> caught
>
> up
>
> to the metadata (not sure). Could we make these apis asynchronous
>
> (ex:
>
> based on java.util.concurrent.Future) to provide room for tapping
> performance improvements such as non-blocking i/o?
>
> 5011. The same question as 5009 on sync vs async api for RSM. Have
>
> we
>
> considered the pros/cons of making the RSM apis asynchronous?
>
> Cheers,
> Kowshik
>
> On Thu, Aug 6, 2020 at 11:02 AM Satish Duggana <
>
> satish. duggana@ gmail. com ( satish.dugg...@gmail.com )
>
> wrote:
>
> Hi Jun,
> Thanks for your comments.
>
> At the high level, that approach sounds reasonable to
>
> me. It would be useful to document how RLMM handles overlapping
>
> archived
>
> offset ranges and how those overlapping segments are deleted
>
> through
>
> retention.
>
> Sure, we will document that in the KIP.
>
> How is the remaining part of the KIP coming along? To me, the
>
> two
>
> biggest
>
> missing items are (1) more detailed documentation on how all the
>
> new
>
> APIs
>
> are being used and (2) metadata format and usage in the internal
>
> topic
>
> __remote_log_metadata.
>
> We are working on updating APIs based on the recent discussions
>
> and get
>
> the perf numbers by plugging in rocksdb as a cache store for
>
> RLMM.
>
> We will update the KIP with the updated APIs and with the above
>
> requested
>
> details in a few days and let you know.
>
> Thanks,
> Satish.
>
> On Wed, Aug 5, 2020 at 12:49 AM Jun Rao < jun@ confluent. io ( jun@
> confluent.io ) > wrote:
>
> Hi, Ying, Satish,
>
> Thanks for the reply. At the high level, that approach sounds
>
> reasonable
>
> to
>
> me. It would be useful to document how RLMM handles overlapping
>
> archived
>
> offset ranges and how those overlapping segments are deleted
>
> through
>
> retention.
>
> How is the remaining part of the KIP coming along? To me, the
>
> two
>
> biggest
>
> missing items are (1) more detailed documentation on how all the
>
> new
>
> APIs
>
> are being used and (2) metadata format and usage in the internal
>
> topic
>
> __remote_log_metadata.
>
> Thanks,
>
> Jun
>
> On Tue, Aug 4, 2020 at 8:32 AM Satish Duggana <
>
> satish. duggana@ gmail. com ( satish.dugg...@gmail.com ) >
>
> wrote:
>
> Hi Jun,
> Thanks for your comment,
>
> 1001. Using the new leader as the source of truth may be fine
>
> too.
>
> What's
>
> not clear to me is when a follower takes over as the new
>
> leader,
>
> from
>
> which
>
> offset does it start archiving to the block storage. I assume
>
> that
>
> the
>
> new
>
> leader starts from the latest archived ooffset by the previous
>
> leader,
>
> but
>
> it seems that's not the case. It would be useful to document
>
> this
>
> in
>
> the
>
> Wiki.
>
> When a follower becomes a leader it needs to findout the offset
>
> from
>
> which the segments to be copied to remote storage. This is
>
> found
>
> by
>
> traversing from the the latest leader epoch from leader epoch
>
> history
>
> and find the highest offset of a segment with that epoch copied
>
> into
>
> remote storage by using respective RLMM APIs. If it can not
>
> find
>
> an
>
> entry then it checks for the previous leader epoch till it
>
> finds
>
> an
>
> entry, If there are no entries till the earliest leader epoch
>
> in
>
> leader epoch cache then it starts copying the segments from the
>
> earliest
>
> epoch entry’s offset.
> Added an example in the KIP here[1]. We will update RLMM APIs
>
> in
>
> the
>
> KIP.
>
> https:/ / cwiki. apache. org/ confluence/ display/ KAFKA/ KIP-405
>
> <https://issues.apache.org/jira/browse/KIP-405>
>
> %3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
>
>
> (
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
>
> )
>
> Satish.
>
> On Tue, Aug 4, 2020 at 9:00 PM Satish Duggana <
>
> satish. duggana@ gmail. com ( satish.dugg...@gmail.com ) >
>
> wrote:
>
> Hi Ying,
> Thanks for your comment.
>
> 1001. Using the new leader as the source of truth may be fine
>
> too.
>
> What's
>
> not clear to me is when a follower takes over as the new
>
> leader,
>
> from
>
> which
>
> offset does it start archiving to the block storage. I assume
>
> that
>
> the
>
> new
>
> leader starts from the latest archived ooffset by the
>
> previous
>
> leader,
>
> but
>
> it seems that's not the case. It would be useful to document
>
> this in
>
> the
>
> Wiki.
>
> When a follower becomes a leader it needs to findout the
>
> offset
>
> from
>
> which the segments to be copied to remote storage. This is
>
> found
>
> by
>
> traversing from the the latest leader epoch from leader epoch
>
> history
>
> and find the highest offset of a segment with that epoch
>
> copied
>
> into
>
> remote storage by using respective RLMM APIs. If it can not
>
> find
>
> an
>
> entry then it checks for the previous leader epoch till it
>
> finds
>
> an
>
> entry, If there are no entries till the earliest leader epoch
>
> in
>
> leader epoch cache then it starts copying the segments from
>
> the
>
> earliest epoch entry’s offset.
> Added an example in the KIP here[1]. We will update RLMM APIs
>
> in
>
> the
>
> KIP.
>
> https:/ / cwiki. apache. org/ confluence/ display/ KAFKA/ KIP-405
>
> <https://issues.apache.org/jira/browse/KIP-405>
>
> %3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
>
>
> (
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition
>
> )
>
> Satish.
>
> On Tue, Aug 4, 2020 at 10:28 AM Ying Zheng
>
> < yingz@ uber. com. invalid ( yi...@uber.com.invalid ) >
>
> wrote:
>
> Hi Jun,
>
> Thank you for the comment! The current KIP is not very
>
> clear
>
> about
>
> this
>
> part.
>
> 1001. The new leader will start archiving from the earliest
>
> local
>
> segment
>
> that is not fully
> covered by the "valid" remote data. "valid" means the
>
> (offset,
>
> leader
>
> epoch) pair is valid
> based on the leader-epoch history.
>
> There are some edge cases where the same offset range (with
>
> the
>
> same
>
> leader
>
> epoch) can
> be copied to the remote storage more than once. But this
>
> kind
>
> of
>
> duplication shouldn't be a
> problem.
>
> Staish is going to explain the details in the KIP with
>
> examples.
>
> On Fri, Jul 31, 2020 at 2:55 PM Jun Rao < jun@ confluent.
>
> io (
>
> j...@confluent.io ) >
>
> wrote:
>
> Hi, Ying,
>
> Thanks for the reply.
>
> 1001. Using the new leader as the source of truth may be
>
> fine
>
> too.
>
> What's
>
> not clear to me is when a follower takes over as the new
>
> leader,
>
> from which
>
> offset does it start archiving to the block storage. I
>
> assume
>
> that
>
> the new
>
> leader starts from the latest archived ooffset by the
>
> previous
>
> leader, but
>
> it seems that's not the case. It would be useful to
>
> document
>
> this in
>
> the
>
> wiki.
>
> Jun
>
> On Tue, Jul 28, 2020 at 12:11 PM Ying Zheng
>
> < yingz@ uber. com. invalid ( yi...@uber.com.invalid ) >
>
> wrote:
>
> 1001.
>
> We did consider this approach. The concerns are
> 1) This makes unclean-leader-election rely on remote
>
> storage.
>
> In
>
> case
>
> the
>
> remote storage
> is unavailable, Kafka will not be able to finish the
>
>

Reply via email to