Going back to initial thread with general questions on KIP. I think
aspects of the user experience still need clarification:

- if a user has a mix of compacted and non-compacted topics it will be
hard to reason about storage needs overall. Could you give a reason
why compacted topics are not supported? This is probably because to do
that you'd have to go with a paging approach (like Ryanne earlier
suggested) and that will be expensive in terms of IO. Do you want to
discount supporting compacted topics this early in the KIP design or
do you want to leave open the option of supporting them eventually? In
an ideal system, Kafka figures out if the topic is compacted or not
and for non-compacted topics it doesn't do the local copy so it goes
through a fast path.

- why do we need per topic remote retention time and bytes? Why isn't
per topic retention time and bytes (without the "remote" part)
sufficient? E.g., if I have a topic and I want retention bytes to be
1TB, and I currently have 500GB local and 500GB remote, Kafka can
manage what segments get deleted first. This would avoid the user
needing to think even more about these extra configs.

Thanks
Eno


On Mon, Oct 21, 2019 at 4:46 PM Harsha <ka...@harsha.io> wrote:
>
> Hi All,
>           Thanks for the initial feedback on the KIP-405.  We opened a PR 
> here https://github.com/apache/kafka/pull/7561 .
> Please take a look and let us know if you have any questions.
> Since this feature is being developed by engineers from different companies 
> we would like to open a feature branch in apache kafka git. It will allow us 
> collaborate in open source community rather than in private branches. Please 
> let me know if you have any objections to opening a feature branch in kafka's 
> git repo.
>
> Thanks,
> Harsha
>
> On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote:
> > Thanks, Ron. Updating the KIP. will add answers here as well
> >
> >  1) If the cold storage technology can be cross-region, is there a
> >  possibility for a disaster recovery Kafka cluster to share the messages in
> >  cold storage?  My guess is the answer is no, and messages replicated to the
> >  D/R cluster have to be migrated to cold storage from there independently.
> >  (The same cross-region cold storage medium could be used, but every message
> >  would appear there twice).
> >
> > If I understand the question correctly, what you are saying is Kafka A
> > cluster (active) shipping logs to remote storage which cross-region
> > replication and another Kafka Cluster B (Passive) will it be able to
> > use the remote storage copied logs directly.
> > For the initial version my answer is No. We can handle this in
> > subsequent changes after this one.
> >
> >  2) Can/should external (non-Kafka) tools have direct access to the messages
> >  in cold storage.  I think this might have been addressed when someone asked
> >  about ACLs, and I believe the answer is "no" -- if some external tool needs
> >  to operate on that data then that external tool should read that data by
> > acting as a Kafka consumer.  Again, just asking to get the answer clearly
> > documented in case it is unclear.
> >
> > The answer is No. All tools/clients must go through broker APIs to
> > access any data (local or remote).
> > Only Kafka broker user will have access to remote storage logs and
> > Security/ACLs will work the way it does today.
> > Tools/Clients going directly to the remote storage might help in terms
> > of efficiency but this requires Protocol changes and some way of
> > syncing ACLs in Kafka to the Remote storage.
> >
> >
> > Thanks,
> > Harsha
> >
> > On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote:
> > > Hi Harsha.  A couple of questions.  I think I know the answers, but it
> > > would be good to see them explicitly documented.
> > >
> > > 1) If the cold storage technology can be cross-region, is there a
> > > possibility for a disaster recovery Kafka cluster to share the messages in
> > > cold storage?  My guess is the answer is no, and messages replicated to 
> > > the
> > > D/R cluster have to be migrated to cold storage from there independently.
> > > (The same cross-region cold storage medium could be used, but every 
> > > message
> > > would appear there twice).
> > >
> > > 2) Can/should external (non-Kafka) tools have direct access to the 
> > > messages
> > > in cold storage.  I think this might have been addressed when someone 
> > > asked
> > > about ACLs, and I believe the answer is "no" -- if some external tool 
> > > needs
> > > to operate on that data then that external tool should read that data by
> > > acting as a Kafka consumer.  Again, just asking to get the answer clearly
> > > documented in case it is unclear.
> > >
> > > Ron
> > >
> > >
> > > On Thu, Apr 4, 2019 at 12:53 AM Harsha <ka...@harsha.io> wrote:
> > >
> > > > Hi Viktor,
> > > >
> > > >
> > > > "Now, will the consumer be able to consume a remote segment if:
> > > > - the remote segment is stored in the remote storage, BUT
> > > > - the leader broker failed right after this AND
> > > > - the follower which is to become a leader didn't scan yet for a new
> > > > segment?"
> > > >
> > > > If I understand correctly, after a local log segment copied to remote 
> > > > and
> > > > leader is failed to write the index files and leadership changed to a
> > > > follower. In this case we consider the log segment copy failed and newly
> > > > elected leader will start copying the data from last the known offset in
> > > > the remote to copy.  Consumers who are looking for the offset which 
> > > > might
> > > > be in the failed copy log segment will continue to be read the data from
> > > > local disk since the local log segment will only be deleted once a
> > > > successful copy of the log segment.
> > > >
> > > > "As a follow-up question, what are your experiences, does a failover in 
> > > > a
> > > > broker causes bigger than usual churn in the consumers? (I'm thinking 
> > > > about
> > > > the time required to rebuild remote index files.)"
> > > >
> > > > Rebuild remote index files will only happen in case of  remote storage
> > > > missing all the copied index files.  Fail-over will not trigger this
> > > > rebuild.
> > > >
> > > >
> > > > Hi Ryan,
> > > >
> > > > "Harsha, can you comment on this alternative approach: instead of 
> > > > fetching
> > > > directly from remote storage via a new API, implement something like
> > > > paging, where segments are paged-in and out of cold storage based on 
> > > > access
> > > > frequency/recency? For example, when a remote segment is accessed, it 
> > > > could
> > > > be first fetched to disk and then read from there. I suppose this would
> > > > require less code changes, or at least less API changes."
> > > >
> > > > Copying whole log segment from remote is inefficient. When tiered 
> > > > storage
> > > > is enabled users might prefer hardware with smaller disks and having to
> > > > copy the log segment to local disk again , especially incase of multiple
> > > > consumers on multiple topics triggering this might negatively affect the
> > > > available local storage.
> > > > What we proposed in the KIP doesn't affect the existing APIs and we 
> > > > didn't
> > > > call for any API changes.
> > > >
> > > > "And related to paging, does the proposal address what happens when a
> > > > broker
> > > > runs out of HDD space? Maybe we should have a way to configure a max 
> > > > number
> > > > of segments or bytes stored on each broker, after which older or
> > > > least-recently-used segments are kicked out, even if they aren't expired
> > > > per the retention policy? Otherwise, I suppose tiered storage requires 
> > > > some
> > > > babysitting to ensure that brokers don't run out of local storage, 
> > > > despite
> > > > having access to potentially unbounded cold storage."
> > > >
> > > > Existing Kafka behavior will not change with addition of tiered storage
> > > > and enabling it also will not change behavior.
> > > > Just like today it's up to the operator to make sure the HD space is
> > > > monitored and take necessary actions to mitigate that before it becomes
> > > > fatal failure for broker. We don't stop users to configure the retention
> > > > period to infinite and they can easily run out of the space.
> > > >
> > > > These are not the alternatives considered as they are not efficient copy
> > > > in out of local disk , hence the reason we didn't add to alternatives
> > > > considered :).
> > > >
> > > >
> > > >
> > > > Thanks,
> > > > Harsha
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Apr 3, 2019, at 7:51 AM, Ryanne Dolan wrote:
> > > > > Harsha, can you comment on this alternative approach: instead of 
> > > > > fetching
> > > > > directly from remote storage via a new API, implement something like
> > > > > paging, where segments are paged-in and out of cold storage based on
> > > > access
> > > > > frequency/recency? For example, when a remote segment is accessed, it
> > > > could
> > > > > be first fetched to disk and then read from there. I suppose this 
> > > > > would
> > > > > require less code changes, or at least less API changes.
> > > > >
> > > > > And related to paging, does the proposal address what happens when a
> > > > broker
> > > > > runs out of HDD space? Maybe we should have a way to configure a max
> > > > number
> > > > > of segments or bytes stored on each broker, after which older or
> > > > > least-recently-used segments are kicked out, even if they aren't 
> > > > > expired
> > > > > per the retention policy? Otherwise, I suppose tiered storage requires
> > > > some
> > > > > babysitting to ensure that brokers don't run out of local storage,
> > > > despite
> > > > > having access to potentially unbounded cold storage.
> > > > >
> > > > > Just some things to add to Alternatives Considered :)
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Wed, Apr 3, 2019 at 8:21 AM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Harsha,
> > > > > >
> > > > > > Thanks for the answer, makes sense.
> > > > > > In the meantime one edge case popped up in my mind but first let me
> > > > > > summarize what I understand if I interpret your KIP correctly.
> > > > > >
> > > > > > So basically whenever the leader RSM copies over a segment to the
> > > > remote
> > > > > > storage, the leader RLM will append an entry to its remote index 
> > > > > > files
> > > > with
> > > > > > the remote position. After this LogManager can delete the local
> > > > segment.
> > > > > > Parallel to this RLM followers are periodically scanning the remote
> > > > storage
> > > > > > for files and if they find a new one they update their indices.
> > > > > >
> > > > > > Now, will the consumer be able to consume a remote segment if:
> > > > > > - the remote segment is stored in the remote storage, BUT
> > > > > > - the leader broker failed right after this AND
> > > > > > - the follower which is to become a leader didn't scan yet for a new
> > > > > > segment?
> > > > > > Would this result in an OffsetOutOfRangeException or would the 
> > > > > > failover
> > > > > > halt the consume request until the new leader has the latest
> > > > information?
> > > > > > As a follow-up question, what are your experiences, does a failover 
> > > > > > in
> > > > a
> > > > > > broker causes bigger than usual churn in the consumers? (I'm 
> > > > > > thinking
> > > > about
> > > > > > the time required to rebuild remote index files.)
> > > > > >
> > > > > > Thanks,
> > > > > > Viktor
> > > > > >
> > > > > > On Mon, Apr 1, 2019 at 8:49 PM Harsha <ka...@harsha.io> wrote:
> > > > > >
> > > > > > > Hi Eno,
> > > > > > >
> > > > > > >       Thanks for the comments. Answers are inline
> > > > > > >
> > > > > > > "Performance & durability
> > > > > > > ----------------------------------
> > > > > > > - would be good to have more discussion on performance 
> > > > > > > implications
> > > > of
> > > > > > > tiering. Copying the data from the local storage to the remote
> > > > storage is
> > > > > > > going to be expensive in terms of network bandwidth and will 
> > > > > > > affect
> > > > > > > foreground traffic to Kafka potentially reducing its throughput 
> > > > > > > and
> > > > > > > latency."
> > > > > > >
> > > > > > > Good point. We've run our local tests with 10GigE cards, even 
> > > > > > > though
> > > > our
> > > > > > > clients bandwidth requirements are high with 1000s of clients
> > > > producing /
> > > > > > > consuming data we never hit hit our limits on network bandwidth. 
> > > > > > > More
> > > > > > often
> > > > > > > we hit limits of CPU, Mem limits than the network bandwidth. But
> > > > this is
> > > > > > > something to be taken care of by the operator if they want to 
> > > > > > > enable
> > > > > > tiered
> > > > > > > storage.
> > > > > > > Also as mentioned in the KIP/previous threads ,clients requesting
> > > > older
> > > > > > > data is very rare and often used as insurance policy . What 
> > > > > > > proposed
> > > > here
> > > > > > > does increase bandwidth interms of shipping logsegments to remote 
> > > > > > > but
> > > > > > > access patterns determines how much we end up reading from remote
> > > > tier.
> > > > > > >
> > > > > > >
> > > > > > > "- throttling the copying of the data above might be a solution,
> > > > however,
> > > > > > > if
> > > > > > > you have a few TB of data to move to the slower remote tier the 
> > > > > > > risk
> > > > is
> > > > > > > that the movement will never complete on time under high Kafka 
> > > > > > > load.
> > > > Do
> > > > > > we
> > > > > > > need a scheduler to use idle time to do the copying?"
> > > > > > >
> > > > > > > In our design, we are going to have scheduler in RLM which will
> > > > > > > periodically copy in-active(rolled-over) log segments.
> > > > > > > Not sure idle time is easy to calculate and schedule a copy. More
> > > > over we
> > > > > > > want to copy the segments as soon as they are available.
> > > > > > > Throttling something we can take into account and provide options 
> > > > > > > to
> > > > tune
> > > > > > > it.
> > > > > > >
> > > > > > >
> > > > > > > "- Have you considered having two options: 1) a slow tier only
> > > > (e.g., all
> > > > > > > the data on HDFS) and 2) a fast tier only like Kafka today. This
> > > > would
> > > > > > > avoid copying data between the tiers. Customers that can tolerate 
> > > > > > > a
> > > > > > slower
> > > > > > > tier with a better price/GB can just choose option (1). Would be
> > > > good to
> > > > > > > put in Alternatives considered."
> > > > > > >
> > > > > > >  What we want to have is Kafka that is known to the users today 
> > > > > > > with
> > > > > > local
> > > > > > > fast disk access and fast data serving layer.  Tiered Storage 
> > > > > > > option
> > > > > > might
> > > > > > > not be for everyone and most users who are happy with Kafka today
> > > > > > shouldn't
> > > > > > > see changes to their operation because of this KIP.
> > > > > > >
> > > > > > > Fundamentally, we believe remote tiered storage data accessed very
> > > > > > > infrequently. We expect anyone going to read from remote tiered
> > > > storage
> > > > > > > expects a slower read response (mostly backfills).
> > > > > > >
> > > > > > > Making an explicit change like slow/fast tier will only cause more
> > > > > > > confusion and operation complexity that will bring into play. With
> > > > tiered
> > > > > > > storage , only users who want to use cheaper long-term storage can
> > > > enable
> > > > > > > it and others can operate the Kafka as its today.  It will give a
> > > > good
> > > > > > > balance of serving latest reads from local disk almost all the 
> > > > > > > time
> > > > and
> > > > > > > shipping older data and reading from remote tier when clients 
> > > > > > > needs
> > > > the
> > > > > > > older data. If necessary, we can re-visit slow/fast-tier options 
> > > > > > > at a
> > > > > > later
> > > > > > > point.
> > > > > > >
> > > > > > >
> > > > > > > "Topic configs
> > > > > > > ------------------
> > > > > > > - related to performance but also availability, we need to discuss
> > > > the
> > > > > > > replication mode for the remote tier. For example, if the Kafka
> > > > topics
> > > > > > used
> > > > > > > to have 3-way replication, will they continue to have 3-way
> > > > replication
> > > > > > on
> > > > > > > the remote tier? Will the user configure that replication? In S3 
> > > > > > > for
> > > > > > > example, one can choose from different S3 tiers like STD or SIA, 
> > > > > > > but
> > > > > > there
> > > > > > > is no direct control over the replication factor like in Kafka."
> > > > > > >
> > > > > > > No. Remote tier is expected to be reliable storage with its own
> > > > > > > replication mechanisms.
> > > > > > >
> > > > > > >
> > > > > > > " how will security and ACLs be configured for the remote tier.
> > > > E.g., if
> > > > > > > user A does not have access to a Kafka topic, when that topic is
> > > > moved to
> > > > > > > S3 or HDFS there needs to be a way to prevent access to the S3
> > > > bucket for
> > > > > > > that user. This might be outside the scope of this KIP but would 
> > > > > > > be
> > > > good
> > > > > > to
> > > > > > > discuss first."
> > > > > > >
> > > > > > > As mentioned in the KIP "Alternatives" section  We will keep the
> > > > Kafka as
> > > > > > > the owner of those files in S3 or HDFS and take advantage of HDFS
> > > > > > security
> > > > > > > model (file system permissions). So any user who wants to go
> > > > directly and
> > > > > > > access files from HDFS will not be able to read them and any 
> > > > > > > client
> > > > > > > requests will go through Kafka and its ACLs will apply like it 
> > > > > > > does
> > > > for
> > > > > > any
> > > > > > > other request.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Hi Ron,
> > > > > > >          Thanks for the comments.
> > > > > > >
> > > > > > > " I'm excited about this potential feature.  Did you consider
> > > > > > > storing the information about the remote segments in a Kafka 
> > > > > > > topic as
> > > > > > > opposed to in the remote storage itself?  The topic would need
> > > > infinite
> > > > > > > retention (or it would need to be compacted) so as not to itself 
> > > > > > > be
> > > > sent
> > > > > > to
> > > > > > > cold storage, but assuming that topic would fit on local disk for 
> > > > > > > all
> > > > > > time
> > > > > > > (an open question as to whether this is acceptable or not) it 
> > > > > > > feels
> > > > like
> > > > > > > the most natural way to communicate information among brokers -- 
> > > > > > > more
> > > > > > > natural than having them poll the remote storage systems, at 
> > > > > > > least."
> > > > > > >
> > > > > > > With RemoteIndex we are extending the current index mechanism to
> > > > find a
> > > > > > > offset and its message to find a file in remote storage for a 
> > > > > > > givent
> > > > > > > offset. This will be optimal way finding for a given offset which
> > > > remote
> > > > > > > segment might be serving compare to storing all of this data into
> > > > > > internal
> > > > > > > topic.
> > > > > > >
> > > > > > > "To add to Eric's question/confusion about where logic lives (RLM 
> > > > > > > vs.
> > > > > > RSM),
> > > > > > > I think it would be helpful to explicitly identify in the KIP that
> > > > the
> > > > > > RLM
> > > > > > > delegates to the RSM since the RSM is part of the public API and 
> > > > > > > is
> > > > the
> > > > > > > pluggable piece.  For example, instead of saying "RLM will ship 
> > > > > > > the
> > > > log
> > > > > > > segment files that are older than a configurable time to remote
> > > > storage"
> > > > > > I
> > > > > > > think it would be better to say "RLM identifies log segment files
> > > > that
> > > > > > are
> > > > > > > older than a configurable time and delegates to the configured 
> > > > > > > RSM to
> > > > > > ship
> > > > > > > them to remote storage" (or something like that -- just make it 
> > > > > > > clear
> > > > > > that
> > > > > > > the RLM is delegating to the configured RSM)."
> > > > > > >
> > > > > > > Thanks. I agree with you. I'll update the KIP.
> > > > > > >
> > > > > > >
> > > > > > > Hi Ambud,
> > > > > > >
> > > > > > > Thanks for the comments.
> > > > > > >
> > > > > > >
> > > > > > > "1. Wouldn't implicit checking for old offsets in remote location 
> > > > > > > if
> > > > not
> > > > > > > found locally on the leader i.e. do we really need remote index
> > > > files?
> > > > > > > Since the storage path for a given topic would presumably be 
> > > > > > > constant
> > > > > > > across all the brokers, the remote topic-partition path could 
> > > > > > > simply
> > > > be
> > > > > > > checked to see if there are any segment file names that would meet
> > > > the
> > > > > > > offset requirements for a Consumer Fetch Request. RSM 
> > > > > > > implementations
> > > > > > could
> > > > > > > optionally cache this information."
> > > > > > >
> > > > > > > By storing the remote index files locally , it will be faster for 
> > > > > > > us
> > > > to
> > > > > > > determine for a requested offset which file might contain the 
> > > > > > > data.
> > > > This
> > > > > > > will help us resolve the remote file quickly and return the 
> > > > > > > response.
> > > > > > > Instead of making a call to remote tier for index look up. Given
> > > > index
> > > > > > > files are smaller , it won't be much hit to the storage space.
> > > > > > >
> > > > > > >
> > > > > > > "2. Would it make sense to create an internal compacted Kafka 
> > > > > > > topic
> > > > to
> > > > > > > publish & record remote segment information? This would enable the
> > > > > > > followers to get updates about new segments rather than running
> > > > list()
> > > > > > > operations on remote storage to detect new segments which may be
> > > > > > > expensive."
> > > > > > >
> > > > > > >
> > > > > > > I think Ron also alluding to this. We thought shipping remote 
> > > > > > > index
> > > > files
> > > > > > > to remote storage files and let the follower's RLM picking that up
> > > > makes
> > > > > > it
> > > > > > > easy to have the current replication protocol without any changes.
> > > > So we
> > > > > > > don't determine if a follower is in ISR or not based on another
> > > > topic's
> > > > > > > replication.  We will run small tests and determine if use of 
> > > > > > > topic
> > > > is
> > > > > > > better for this. Thanks for the suggestion.
> > > > > > >
> > > > > > > 3. For RLM to scan local segment rotations are you thinking of
> > > > leveraging
> > > > > > > java.nio.file.WatchService or simply running listFiles() on a
> > > > periodic
> > > > > > > basis? Since WatchService implementation is heavily OS dependent 
> > > > > > > it
> > > > might
> > > > > > > create some complications around missing FS Events.
> > > > > > >
> > > > > > > Ideally we want to introduce file events like you suggested. For 
> > > > > > > POC
> > > > work
> > > > > > > we are using just listFiles(). Also copying these files to remote
> > > > can be
> > > > > > > slower and we will not delete the files from local disk until the
> > > > segment
> > > > > > > is copied and any requests to the data in these files will be 
> > > > > > > served
> > > > from
> > > > > > > local disk. So I don't think we need to be aggressive and optimize
> > > > the
> > > > > > this
> > > > > > > copy segment to remote path.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Hi Viktor,
> > > > > > >          Thanks for the comments.
> > > > > > >
> > > > > > > "I have a rather technical question to this. How do you plan to
> > > > package
> > > > > > > this
> > > > > > > extension? Does this mean that Kafka will depend on HDFS?
> > > > > > > I think it'd be nice to somehow separate this off to a different
> > > > package
> > > > > > in
> > > > > > > the project so that it could be built and released separately from
> > > > the
> > > > > > main
> > > > > > > Kafka packages."
> > > > > > >
> > > > > > > We would like all of this code to be part of Apache Kafka . In 
> > > > > > > early
> > > > days
> > > > > > > of Kafka, there is external module which used to contain kafka to
> > > > hdfs
> > > > > > copy
> > > > > > > tools and dependencies.  We would like to have RLM (class
> > > > implementation)
> > > > > > > and RSM(interface) to be in core and as you suggested,
> > > > implementation of
> > > > > > > RSM could be in another package so that the dependencies of RSM 
> > > > > > > won't
> > > > > > come
> > > > > > > into Kafka's classpath unless someone explicity configures them.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Harsha
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Apr 1, 2019, at 1:02 AM, Viktor Somogyi-Vass wrote:
> > > > > > > > Hey Harsha,
> > > > > > > >
> > > > > > > > I have a rather technical question to this. How do you plan to
> > > > package
> > > > > > > this
> > > > > > > > extension? Does this mean that Kafka will depend on HDFS?
> > > > > > > > I think it'd be nice to somehow separate this off to a different
> > > > > > package
> > > > > > > in
> > > > > > > > the project so that it could be built and released separately 
> > > > > > > > from
> > > > the
> > > > > > > main
> > > > > > > > Kafka packages.
> > > > > > > > This decoupling would be useful when direct dependency on HDFS 
> > > > > > > > (or
> > > > > > other
> > > > > > > > implementations) is not needed and would also encourage 
> > > > > > > > decoupling
> > > > for
> > > > > > > > other storage implementations.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Viktor
> > > > > > > >
> > > > > > > > On Mon, Apr 1, 2019 at 3:44 AM Ambud Sharma <
> > > > asharma52...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Harsha,
> > > > > > > > >
> > > > > > > > > Thank you for proposing this KIP. We are looking forward to 
> > > > > > > > > this
> > > > > > > feature as
> > > > > > > > > well.
> > > > > > > > >
> > > > > > > > > A few questions around the design & implementation:
> > > > > > > > >
> > > > > > > > > 1. Wouldn't implicit checking for old offsets in remote 
> > > > > > > > > location
> > > > if
> > > > > > not
> > > > > > > > > found locally on the leader i.e. do we really need remote 
> > > > > > > > > index
> > > > > > files?
> > > > > > > > > Since the storage path for a given topic would presumably be
> > > > constant
> > > > > > > > > across all the brokers, the remote topic-partition path could
> > > > simply
> > > > > > be
> > > > > > > > > checked to see if there are any segment file names that would
> > > > meet
> > > > > > the
> > > > > > > > > offset requirements for a Consumer Fetch Request. RSM
> > > > implementations
> > > > > > > could
> > > > > > > > > optionally cache this information.
> > > > > > > > >
> > > > > > > > > 2. Would it make sense to create an internal compacted Kafka
> > > > topic to
> > > > > > > > > publish & record remote segment information? This would enable
> > > > the
> > > > > > > > > followers to get updates about new segments rather than 
> > > > > > > > > running
> > > > > > list()
> > > > > > > > > operations on remote storage to detect new segments which may 
> > > > > > > > > be
> > > > > > > expensive.
> > > > > > > > >
> > > > > > > > > 3. For RLM to scan local segment rotations are you thinking of
> > > > > > > leveraging
> > > > > > > > > java.nio.file.WatchService or simply running listFiles() on a
> > > > > > periodic
> > > > > > > > > basis? Since WatchService implementation is heavily OS 
> > > > > > > > > dependent
> > > > it
> > > > > > > might
> > > > > > > > > create some complications around missing FS Events.
> > > > > > > > >
> > > > > > > > > Thanks.
> > > > > > > > > Ambud
> > > > > > > > >
> > > > > > > > > On Thu, Mar 28, 2019 at 8:04 AM Ron Dagostino 
> > > > > > > > > <rndg...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Harsha.  I'm excited about this potential feature.  Did 
> > > > > > > > > > you
> > > > > > > consider
> > > > > > > > > > storing the information about the remote segments in a Kafka
> > > > topic
> > > > > > as
> > > > > > > > > > opposed to in the remote storage itself?  The topic would 
> > > > > > > > > > need
> > > > > > > infinite
> > > > > > > > > > retention (or it would need to be compacted) so as not to
> > > > itself be
> > > > > > > sent
> > > > > > > > > to
> > > > > > > > > > cold storage, but assuming that topic would fit on local 
> > > > > > > > > > disk
> > > > for
> > > > > > all
> > > > > > > > > time
> > > > > > > > > > (an open question as to whether this is acceptable or not) 
> > > > > > > > > > it
> > > > feels
> > > > > > > like
> > > > > > > > > > the most natural way to communicate information among 
> > > > > > > > > > brokers
> > > > --
> > > > > > more
> > > > > > > > > > natural than having them poll the remote storage systems, at
> > > > least.
> > > > > > > > > >
> > > > > > > > > > To add to Eric's question/confusion about where logic lives
> > > > (RLM
> > > > > > vs.
> > > > > > > > > RSM),
> > > > > > > > > > I think it would be helpful to explicitly identify in the 
> > > > > > > > > > KIP
> > > > that
> > > > > > > the
> > > > > > > > > RLM
> > > > > > > > > > delegates to the RSM since the RSM is part of the public API
> > > > and is
> > > > > > > the
> > > > > > > > > > pluggable piece.  For example, instead of saying "RLM will
> > > > ship the
> > > > > > > log
> > > > > > > > > > segment files that are older than a configurable time to 
> > > > > > > > > > remote
> > > > > > > storage"
> > > > > > > > > I
> > > > > > > > > > think it would be better to say "RLM identifies log segment
> > > > files
> > > > > > > that
> > > > > > > > > are
> > > > > > > > > > older than a configurable time and delegates to the 
> > > > > > > > > > configured
> > > > RSM
> > > > > > to
> > > > > > > > > ship
> > > > > > > > > > them to remote storage" (or something like that -- just 
> > > > > > > > > > make it
> > > > > > clear
> > > > > > > > > that
> > > > > > > > > > the RLM is delegating to the configured RSM).
> > > > > > > > > >
> > > > > > > > > > Ron
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Mar 28, 2019 at 6:12 AM Eno Thereska <
> > > > > > eno.there...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Harsha,
> > > > > > > > > > >
> > > > > > > > > > > A couple of comments:
> > > > > > > > > > >
> > > > > > > > > > > Performance & durability
> > > > > > > > > > > ----------------------------------
> > > > > > > > > > > - would be good to have more discussion on performance
> > > > > > > implications of
> > > > > > > > > > > tiering. Copying the data from the local storage to the
> > > > remote
> > > > > > > storage
> > > > > > > > > is
> > > > > > > > > > > going to be expensive in terms of network bandwidth and 
> > > > > > > > > > > will
> > > > > > affect
> > > > > > > > > > > foreground traffic to Kafka potentially reducing its
> > > > throughput
> > > > > > and
> > > > > > > > > > > latency.
> > > > > > > > > > > - throttling the copying of the data above might be a
> > > > solution,
> > > > > > > however
> > > > > > > > > > if
> > > > > > > > > > > you have a few TB of data to move to the slower remote 
> > > > > > > > > > > tier
> > > > the
> > > > > > > risk is
> > > > > > > > > > > that the movement will never complete on time under high
> > > > Kafka
> > > > > > > load. Do
> > > > > > > > > > we
> > > > > > > > > > > need a scheduler to use idle time to do the copying?
> > > > > > > > > > > - Have you considered having two options: 1) a slow tier 
> > > > > > > > > > > only
> > > > > > > (e.g.,
> > > > > > > > > all
> > > > > > > > > > > the data on HDFS) and 2) a fast tier only like Kafka 
> > > > > > > > > > > today.
> > > > This
> > > > > > > would
> > > > > > > > > > > avoid copying data between the tiers. Customers that can
> > > > > > tolerate a
> > > > > > > > > > slower
> > > > > > > > > > > tier with a better price/GB can just choose option (1).
> > > > Would be
> > > > > > > good
> > > > > > > > > to
> > > > > > > > > > > put in Alternatives considered.
> > > > > > > > > > >
> > > > > > > > > > > Topic configs
> > > > > > > > > > > ------------------
> > > > > > > > > > > - related to performance but also availability, we need to
> > > > > > discuss
> > > > > > > the
> > > > > > > > > > > replication mode for the remote tier. For example, if the
> > > > Kafka
> > > > > > > topics
> > > > > > > > > > used
> > > > > > > > > > > to have 3-way replication, will they continue to have 
> > > > > > > > > > > 3-way
> > > > > > > replication
> > > > > > > > > > on
> > > > > > > > > > > the remote tier? Will the user configure that replication?
> > > > In S3
> > > > > > > for
> > > > > > > > > > > example, one can choose from different S3 tiers like STD 
> > > > > > > > > > > or
> > > > SIA,
> > > > > > > but
> > > > > > > > > > there
> > > > > > > > > > > is no direct control over the replication factor like in
> > > > Kafka.
> > > > > > > > > > > - how will security and ACLs be configured for the remote
> > > > tier.
> > > > > > > E.g.,
> > > > > > > > > if
> > > > > > > > > > > user A does not have access to a Kafka topic, when that
> > > > topic is
> > > > > > > moved
> > > > > > > > > to
> > > > > > > > > > > S3 or HDFS there needs to be a way to prevent access to 
> > > > > > > > > > > the
> > > > S3
> > > > > > > bucket
> > > > > > > > > for
> > > > > > > > > > > that user. This might be outside the scope of this KIP but
> > > > would
> > > > > > be
> > > > > > > > > good
> > > > > > > > > > to
> > > > > > > > > > > discuss first.
> > > > > > > > > > >
> > > > > > > > > > > That's it for now, thanks
> > > > > > > > > > > Eno
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Mar 27, 2019 at 4:40 PM Harsha <ka...@harsha.io>
> > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi All,
> > > > > > > > > > > >            Thanks for your initial feedback. We updated 
> > > > > > > > > > > > the
> > > > > > KIP.
> > > > > > > > > Please
> > > > > > > > > > > > take a look and let us know if you have any questions.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Harsha
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Feb 6, 2019, at 10:30 AM, Harsha wrote:
> > > > > > > > > > > > > Thanks Eno, Adam & Satish for you review and 
> > > > > > > > > > > > > questions.
> > > > I'll
> > > > > > > > > address
> > > > > > > > > > > > > these in KIP and update the thread here.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Harsha
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana wrote:
> > > > > > > > > > > > > > Thanks, Harsha for the KIP. It is a good start for
> > > > tiered
> > > > > > > storage
> > > > > > > > > > in
> > > > > > > > > > > > > > Kafka. I have a few comments/questions.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > It may be good to have a configuration to keep the
> > > > number
> > > > > > of
> > > > > > > > > local
> > > > > > > > > > > > > > segments instead of keeping only the active segment.
> > > > This
> > > > > > > config
> > > > > > > > > > can
> > > > > > > > > > > > > > be exposed at cluster and topic levels with default
> > > > value
> > > > > > as
> > > > > > > 1.
> > > > > > > > > In
> > > > > > > > > > > > > > some use cases, few consumers may lag over one
> > > > segment, it
> > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > > > > better to serve from local storage instead of remote
> > > > > > storage.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > It may be better to keep “remote.log.storage.enable”
> > > > and
> > > > > > > > > respective
> > > > > > > > > > > > > > configuration at topic level along with cluster 
> > > > > > > > > > > > > > level.
> > > > It
> > > > > > > will be
> > > > > > > > > > > > > > helpful in environments where few topics are 
> > > > > > > > > > > > > > configured
> > > > > > with
> > > > > > > > > > > > > > local-storage and other topics are configured with
> > > > remote
> > > > > > > > > storage.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Each topic-partition leader pushes its log segments
> > > > with
> > > > > > > > > respective
> > > > > > > > > > > > > > index files to remote whenever active log rolls 
> > > > > > > > > > > > > > over,
> > > > it
> > > > > > > updates
> > > > > > > > > > the
> > > > > > > > > > > > > > remote log index file for the respective remote log
> > > > > > segment.
> > > > > > > The
> > > > > > > > > > > > > > second option is to add offset index files also for
> > > > each
> > > > > > > segment.
> > > > > > > > > > It
> > > > > > > > > > > > > > can serve consumer fetch requests for old segments 
> > > > > > > > > > > > > > from
> > > > > > > local log
> > > > > > > > > > > > > > segment instead of serving directly from the remote 
> > > > > > > > > > > > > > log
> > > > > > > which may
> > > > > > > > > > > > > > cause high latencies. There can be different
> > > > strategies in
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > > > > > > remote segment is copied to a local segment.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What is “remote.log.manager.scheduler.interval.ms”
> > > > config
> > > > > > > about?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > How do followers sync RemoteLogSegmentIndex files? 
> > > > > > > > > > > > > > Do
> > > > they
> > > > > > > > > request
> > > > > > > > > > > > > > from leader replica? This looks to be important as 
> > > > > > > > > > > > > > the
> > > > > > failed
> > > > > > > > > over
> > > > > > > > > > > > > > leader should have RemoteLogSegmentIndex updated and
> > > > ready
> > > > > > to
> > > > > > > > > avoid
> > > > > > > > > > > > > > high latencies in serving old data stored in remote
> > > > logs.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Satish.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan <
> > > > > > > > > > ryannedo...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks Harsha, makes sense.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Ryanne
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 5:53 PM Harsha 
> > > > > > > > > > > > > > > Chintalapani <
> > > > > > > > > > > ka...@harsha.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > "I think you are saying that this enables
> > > > additional
> > > > > > > > > > (potentially
> > > > > > > > > > > > cheaper)
> > > > > > > > > > > > > > > > storage options without *requiring* an existing 
> > > > > > > > > > > > > > > > ETL
> > > > > > > > > pipeline. “
> > > > > > > > > > > > > > > > Yes.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > " But it's not really a replacement for the 
> > > > > > > > > > > > > > > > sort of
> > > > > > > pipelines
> > > > > > > > > > > > people build
> > > > > > > > > > > > > > > > with Connect, Gobblin etc.”
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It is not. But also making an assumption that
> > > > everyone
> > > > > > > runs
> > > > > > > > > > these
> > > > > > > > > > > > > > > > pipelines for storing raw Kafka data into HDFS 
> > > > > > > > > > > > > > > > or
> > > > S3 is
> > > > > > > also
> > > > > > > > > > > wrong
> > > > > > > > > > > > > > > >  assumption.
> > > > > > > > > > > > > > > > The aim of this KIP is to provide tiered 
> > > > > > > > > > > > > > > > storage as
> > > > > > whole
> > > > > > > > > > package
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > > asking users to ship the data on their own using
> > > > > > existing
> > > > > > > > > ETL,
> > > > > > > > > > > > which means
> > > > > > > > > > > > > > > > running a consumer and maintaining those 
> > > > > > > > > > > > > > > > pipelines.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > " My point was that, if you are already 
> > > > > > > > > > > > > > > > offloading
> > > > > > > records in
> > > > > > > > > > an
> > > > > > > > > > > > ETL
> > > > > > > > > > > > > > > > pipeline, why do you need a new pipeline built
> > > > into the
> > > > > > > > > broker
> > > > > > > > > > to
> > > > > > > > > > > > ship the
> > > > > > > > > > > > > > > > same data to the same place?”
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > As you said its ETL pipeline, which means users 
> > > > > > > > > > > > > > > > of
> > > > > > these
> > > > > > > > > > > pipelines
> > > > > > > > > > > > are
> > > > > > > > > > > > > > > > reading the data from broker and transforming 
> > > > > > > > > > > > > > > > its
> > > > state
> > > > > > > and
> > > > > > > > > > > > storing it
> > > > > > > > > > > > > > > > somewhere.
> > > > > > > > > > > > > > > > The point of this KIP is store log segments as 
> > > > > > > > > > > > > > > > it
> > > > is
> > > > > > > without
> > > > > > > > > > > > changing
> > > > > > > > > > > > > > > > their structure so that we can use the existing
> > > > offset
> > > > > > > > > > mechanisms
> > > > > > > > > > > > to look
> > > > > > > > > > > > > > > > it up when the consumer needs to read old data.
> > > > When
> > > > > > you
> > > > > > > do
> > > > > > > > > > load
> > > > > > > > > > > > it via
> > > > > > > > > > > > > > > > your existing pipelines you are reading the 
> > > > > > > > > > > > > > > > topic
> > > > as a
> > > > > > > whole
> > > > > > > > > ,
> > > > > > > > > > > > which
> > > > > > > > > > > > > > > > doesn’t guarantee that you’ll produce this data
> > > > back
> > > > > > into
> > > > > > > > > HDFS
> > > > > > > > > > in
> > > > > > > > > > > > S3 in the
> > > > > > > > > > > > > > > > same order and who is going to generate the 
> > > > > > > > > > > > > > > > Index
> > > > files
> > > > > > > > > again.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > "So you'd end up with one of 1)cold segments are
> > > > only
> > > > > > > useful
> > > > > > > > > to
> > > > > > > > > > > > Kafka; 2)
> > > > > > > > > > > > > > > > you have the same data written to HDFS/etc 
> > > > > > > > > > > > > > > > twice,
> > > > once
> > > > > > > for
> > > > > > > > > > Kafka
> > > > > > > > > > > > and once
> > > > > > > > > > > > > > > > for everything else, in two separate formats”
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > You are talking two different use cases. If
> > > > someone is
> > > > > > > > > storing
> > > > > > > > > > > raw
> > > > > > > > > > > > data
> > > > > > > > > > > > > > > > out of Kafka for long term access.
> > > > > > > > > > > > > > > > By storing the data as it is in HDFS though 
> > > > > > > > > > > > > > > > Kafka
> > > > will
> > > > > > > solve
> > > > > > > > > > this
> > > > > > > > > > > > issue.
> > > > > > > > > > > > > > > > They do not need to run another pipe-line to 
> > > > > > > > > > > > > > > > ship
> > > > these
> > > > > > > logs.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > If they are running pipelines to store in HDFS 
> > > > > > > > > > > > > > > > in a
> > > > > > > different
> > > > > > > > > > > > format,
> > > > > > > > > > > > > > > > thats a different use case. May be they are
> > > > > > transforming
> > > > > > > > > Kafka
> > > > > > > > > > > > logs to ORC
> > > > > > > > > > > > > > > > so that they can query through Hive.  Once you
> > > > > > transform
> > > > > > > the
> > > > > > > > > > log
> > > > > > > > > > > > segment it
> > > > > > > > > > > > > > > > does loose its ability to use the existing 
> > > > > > > > > > > > > > > > offset
> > > > > > index.
> > > > > > > > > > > > > > > > Main objective here not to change the existing
> > > > protocol
> > > > > > > and
> > > > > > > > > > still
> > > > > > > > > > > > be able
> > > > > > > > > > > > > > > > to write and read logs from remote storage.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > -Harsha
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Feb 4, 2019, 2:53 PM -0800, Ryanne Dolan <
> > > > > > > > > > > ryannedo...@gmail.com
> > > > > > > > > > > > >,
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > Thanks Harsha, makes sense for the most part.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > tiered storage is to get away from this and
> > > > make
> > > > > > this
> > > > > > > > > > > > transparent to
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > user
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I think you are saying that this enables
> > > > additional
> > > > > > > > > > > (potentially
> > > > > > > > > > > > cheaper)
> > > > > > > > > > > > > > > > > storage options without *requiring* an 
> > > > > > > > > > > > > > > > > existing
> > > > ETL
> > > > > > > > > pipeline.
> > > > > > > > > > > > But it's
> > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > really a replacement for the sort of pipelines
> > > > people
> > > > > > > build
> > > > > > > > > > > with
> > > > > > > > > > > > Connect,
> > > > > > > > > > > > > > > > > Gobblin etc. My point was that, if you are
> > > > already
> > > > > > > > > offloading
> > > > > > > > > > > > records in
> > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > ETL pipeline, why do you need a new pipeline
> > > > built
> > > > > > > into the
> > > > > > > > > > > > broker to
> > > > > > > > > > > > > > > > ship
> > > > > > > > > > > > > > > > > the same data to the same place? I think in 
> > > > > > > > > > > > > > > > > most
> > > > > > cases
> > > > > > > this
> > > > > > > > > > > will
> > > > > > > > > > > > be an
> > > > > > > > > > > > > > > > > additional pipeline, not a replacement, 
> > > > > > > > > > > > > > > > > because
> > > > the
> > > > > > > > > segments
> > > > > > > > > > > > written to
> > > > > > > > > > > > > > > > > cold storage won't be useful outside Kafka. So
> > > > you'd
> > > > > > > end up
> > > > > > > > > > > with
> > > > > > > > > > > > one of
> > > > > > > > > > > > > > > > 1)
> > > > > > > > > > > > > > > > > cold segments are only useful to Kafka; 2) you
> > > > have
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > > > data written
> > > > > > > > > > > > > > > > > to HDFS/etc twice, once for Kafka and once for
> > > > > > > everything
> > > > > > > > > > else,
> > > > > > > > > > > > in two
> > > > > > > > > > > > > > > > > separate formats; 3) you use your existing ETL
> > > > > > > pipeline and
> > > > > > > > > > > read
> > > > > > > > > > > > cold
> > > > > > > > > > > > > > > > data
> > > > > > > > > > > > > > > > > directly.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > To me, an ideal solution would let me spool
> > > > segments
> > > > > > > from
> > > > > > > > > > Kafka
> > > > > > > > > > > > to any
> > > > > > > > > > > > > > > > sink
> > > > > > > > > > > > > > > > > I would like, and then let Kafka clients
> > > > seamlessly
> > > > > > > access
> > > > > > > > > > that
> > > > > > > > > > > > cold
> > > > > > > > > > > > > > > > data.
> > > > > > > > > > > > > > > > > Today I can do that in the client, but ideally
> > > > the
> > > > > > > broker
> > > > > > > > > > would
> > > > > > > > > > > > do it for
> > > > > > > > > > > > > > > > > me via some HDFS/Hive/S3 plugin. The KIP 
> > > > > > > > > > > > > > > > > seems to
> > > > > > > > > accomplish
> > > > > > > > > > > > that -- just
> > > > > > > > > > > > > > > > > without leveraging anything I've currently 
> > > > > > > > > > > > > > > > > got in
> > > > > > > place.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Ryanne
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 3:34 PM Harsha <
> > > > > > ka...@harsha.io
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Eric,
> > > > > > > > > > > > > > > > > > Thanks for your questions. Answers are 
> > > > > > > > > > > > > > > > > > in-line
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > "The high-level design seems to indicate 
> > > > > > > > > > > > > > > > > > that
> > > > all
> > > > > > of
> > > > > > > the
> > > > > > > > > > > logic
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > > when and
> > > > > > > > > > > > > > > > > > how to copy log segments to remote storage
> > > > lives in
> > > > > > > the
> > > > > > > > > RLM
> > > > > > > > > > > > class. The
> > > > > > > > > > > > > > > > > > default implementation is then HDFS specific
> > > > with
> > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > implementations being left to the community.
> > > > This
> > > > > > > seems
> > > > > > > > > > like
> > > > > > > > > > > > it would
> > > > > > > > > > > > > > > > > > require anyone implementing a new RLM to 
> > > > > > > > > > > > > > > > > > also
> > > > > > > > > re-implement
> > > > > > > > > > > the
> > > > > > > > > > > > logic
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > when to ship data to remote storage."
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > RLM will be responsible for shipping log
> > > > segments
> > > > > > > and it
> > > > > > > > > > will
> > > > > > > > > > > > decide
> > > > > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > > > a log segment is ready to be shipped over.
> > > > > > > > > > > > > > > > > > Once a Log Segement(s) are identified as 
> > > > > > > > > > > > > > > > > > rolled
> > > > > > > over, RLM
> > > > > > > > > > > will
> > > > > > > > > > > > delegate
> > > > > > > > > > > > > > > > > > this responsibility to a pluggable remote
> > > > storage
> > > > > > > > > > > > implementation.
> > > > > > > > > > > > > > > > Users who
> > > > > > > > > > > > > > > > > > are looking add their own implementation to
> > > > enable
> > > > > > > other
> > > > > > > > > > > > storages all
> > > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > > need to do is to implement the copy and read
> > > > > > > mechanisms
> > > > > > > > > and
> > > > > > > > > > > > not to
> > > > > > > > > > > > > > > > > > re-implement RLM itself.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > "Would it not be better for the Remote Log
> > > > Manager
> > > > > > > > > > > > implementation to be
> > > > > > > > > > > > > > > > > > non-configurable, and instead have an
> > > > interface for
> > > > > > > the
> > > > > > > > > > > remote
> > > > > > > > > > > > storage
> > > > > > > > > > > > > > > > > > layer? That way the "when" of the logic is
> > > > > > consistent
> > > > > > > > > > across
> > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > implementations and it's only a matter of
> > > > "how,"
> > > > > > > similar
> > > > > > > > > to
> > > > > > > > > > > > how the
> > > > > > > > > > > > > > > > Streams
> > > > > > > > > > > > > > > > > > StateStores are managed."
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > It's possible that we can RLM
> > > > non-configurable. But
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > > initial
> > > > > > > > > > > > > > > > > > release and to keep the backward 
> > > > > > > > > > > > > > > > > > compatibility
> > > > > > > > > > > > > > > > > > we want to make this configurable and for 
> > > > > > > > > > > > > > > > > > any
> > > > users
> > > > > > > who
> > > > > > > > > > might
> > > > > > > > > > > > not be
> > > > > > > > > > > > > > > > > > interested in having the LogSegments 
> > > > > > > > > > > > > > > > > > shipped to
> > > > > > > remote,
> > > > > > > > > > they
> > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > need to
> > > > > > > > > > > > > > > > > > worry about this.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Ryanne,
> > > > > > > > > > > > > > > > > > Thanks for your questions.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > "How could this be used to leverage fast
> > > > key-value
> > > > > > > > > stores,
> > > > > > > > > > > e.g.
> > > > > > > > > > > > > > > > Couchbase,
> > > > > > > > > > > > > > > > > > which can serve individual records but maybe
> > > > not
> > > > > > > entire
> > > > > > > > > > > > segments? Or
> > > > > > > > > > > > > > > > is the
> > > > > > > > > > > > > > > > > > idea to only support writing and fetching
> > > > entire
> > > > > > > > > segments?
> > > > > > > > > > > > Would it
> > > > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > > > sense to support both?"
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > LogSegment once its rolled over are 
> > > > > > > > > > > > > > > > > > immutable
> > > > > > > objects and
> > > > > > > > > > we
> > > > > > > > > > > > want to
> > > > > > > > > > > > > > > > keep
> > > > > > > > > > > > > > > > > > the current structure of LogSegments and
> > > > > > > corresponding
> > > > > > > > > > Index
> > > > > > > > > > > > files. It
> > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > be easy to copy the whole segment as it is,
> > > > instead
> > > > > > > of
> > > > > > > > > > > > re-reading each
> > > > > > > > > > > > > > > > file
> > > > > > > > > > > > > > > > > > and use a key/value store.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > "
> > > > > > > > > > > > > > > > > > - Instead of defining a new interface and/or
> > > > > > > mechanism to
> > > > > > > > > > ETL
> > > > > > > > > > > > segment
> > > > > > > > > > > > > > > > files
> > > > > > > > > > > > > > > > > > from brokers to cold storage, can we just
> > > > leverage
> > > > > > > Kafka
> > > > > > > > > > > > itself? In
> > > > > > > > > > > > > > > > > > particular, we can already ETL records to 
> > > > > > > > > > > > > > > > > > HDFS
> > > > via
> > > > > > > Kafka
> > > > > > > > > > > > Connect,
> > > > > > > > > > > > > > > > Gobblin
> > > > > > > > > > > > > > > > > > etc -- we really just need a way for 
> > > > > > > > > > > > > > > > > > brokers to
> > > > > > read
> > > > > > > > > these
> > > > > > > > > > > > records
> > > > > > > > > > > > > > > > back.
> > > > > > > > > > > > > > > > > > I'm wondering whether the new API could be
> > > > limited
> > > > > > > to the
> > > > > > > > > > > > fetch, and
> > > > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > > > existing ETL pipelines could be more easily
> > > > > > > leveraged.
> > > > > > > > > For
> > > > > > > > > > > > example, if
> > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > already have an ETL pipeline from Kafka to
> > > > HDFS,
> > > > > > you
> > > > > > > > > could
> > > > > > > > > > > > leave that
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > place and just tell Kafka how to read these
> > > > > > > > > > records/segments
> > > > > > > > > > > > from cold
> > > > > > > > > > > > > > > > > > storage when necessary."
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > This is pretty much what everyone does and 
> > > > > > > > > > > > > > > > > > it
> > > > has
> > > > > > the
> > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > overhead
> > > > > > > > > > > > > > > > > > of keeping these pipelines operating and
> > > > > > monitoring.
> > > > > > > > > > > > > > > > > > What's proposed in the KIP is not ETL. It's
> > > > just
> > > > > > > looking
> > > > > > > > > a
> > > > > > > > > > > the
> > > > > > > > > > > > logs
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > are written and rolled over to copy the file
> > > > as it
> > > > > > > is.
> > > > > > > > > > > > > > > > > > Each new topic needs to be added (sure we 
> > > > > > > > > > > > > > > > > > can
> > > > do so
> > > > > > > via
> > > > > > > > > > > > wildcard or
> > > > > > > > > > > > > > > > > > another mechanism) but new topics need to be
> > > > > > onboard
> > > > > > > to
> > > > > > > > > > ship
> > > > > > > > > > > > the data
> > > > > > > > > > > > > > > > into
> > > > > > > > > > > > > > > > > > remote storage through a traditional ETL
> > > > pipeline.
> > > > > > > > > > > > > > > > > > Once the data lands somewhere like HDFS/HIVE
> > > > etc..
> > > > > > > Users
> > > > > > > > > > need
> > > > > > > > > > > > to write
> > > > > > > > > > > > > > > > > > another processing line to re-process this 
> > > > > > > > > > > > > > > > > > data
> > > > > > > similar
> > > > > > > > > to
> > > > > > > > > > > how
> > > > > > > > > > > > they are
> > > > > > > > > > > > > > > > > > doing it in their Stream processing 
> > > > > > > > > > > > > > > > > > pipelines.
> > > > > > Tiered
> > > > > > > > > > storage
> > > > > > > > > > > > is to get
> > > > > > > > > > > > > > > > > > away from this and make this transparent to 
> > > > > > > > > > > > > > > > > > the
> > > > > > user.
> > > > > > > > > They
> > > > > > > > > > > > don't need
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > run another ETL process to ship the logs.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > "I'm wondering if we could just add support 
> > > > > > > > > > > > > > > > > > for
> > > > > > > loading
> > > > > > > > > > > > segments from
> > > > > > > > > > > > > > > > > > remote URIs instead of from file, i.e. via
> > > > plugins
> > > > > > > for
> > > > > > > > > > s3://,
> > > > > > > > > > > > hdfs://
> > > > > > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > > > > I suspect less broker logic would change in
> > > > that
> > > > > > > case --
> > > > > > > > > > the
> > > > > > > > > > > > broker
> > > > > > > > > > > > > > > > > > wouldn't necessarily care if it reads from
> > > > file://
> > > > > > or
> > > > > > > > > s3://
> > > > > > > > > > > to
> > > > > > > > > > > > load a
> > > > > > > > > > > > > > > > given
> > > > > > > > > > > > > > > > > > segment."
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Yes, this is what we are discussing in KIP. 
> > > > > > > > > > > > > > > > > > We
> > > > are
> > > > > > > > > leaving
> > > > > > > > > > > the
> > > > > > > > > > > > details
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > loading segments to RLM read part instead of
> > > > > > directly
> > > > > > > > > > > exposing
> > > > > > > > > > > > this in
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > Broker. This way we can keep the current 
> > > > > > > > > > > > > > > > > > Kafka
> > > > code
> > > > > > > as it
> > > > > > > > > > is
> > > > > > > > > > > > without
> > > > > > > > > > > > > > > > > > changing the assumptions around the local
> > > > disk. Let
> > > > > > > the
> > > > > > > > > RLM
> > > > > > > > > > > > handle the
> > > > > > > > > > > > > > > > > > remote storage part.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > Harsha
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019, at 12:54 PM, Ryanne 
> > > > > > > > > > > > > > > > > > Dolan
> > > > > > wrote:
> > > > > > > > > > > > > > > > > > > Harsha, Sriharsha, Suresh, a couple 
> > > > > > > > > > > > > > > > > > > thoughts:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > - How could this be used to leverage fast
> > > > > > key-value
> > > > > > > > > > stores,
> > > > > > > > > > > > e.g.
> > > > > > > > > > > > > > > > > > Couchbase,
> > > > > > > > > > > > > > > > > > > which can serve individual records but 
> > > > > > > > > > > > > > > > > > > maybe
> > > > not
> > > > > > > entire
> > > > > > > > > > > > segments? Or
> > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > idea to only support writing and fetching
> > > > entire
> > > > > > > > > > segments?
> > > > > > > > > > > > Would it
> > > > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > > > > sense to support both?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > - Instead of defining a new interface 
> > > > > > > > > > > > > > > > > > > and/or
> > > > > > > mechanism
> > > > > > > > > to
> > > > > > > > > > > > ETL segment
> > > > > > > > > > > > > > > > > > files
> > > > > > > > > > > > > > > > > > > from brokers to cold storage, can we just
> > > > > > leverage
> > > > > > > > > Kafka
> > > > > > > > > > > > itself? In
> > > > > > > > > > > > > > > > > > > particular, we can already ETL records to
> > > > HDFS
> > > > > > via
> > > > > > > > > Kafka
> > > > > > > > > > > > Connect,
> > > > > > > > > > > > > > > > Gobblin
> > > > > > > > > > > > > > > > > > > etc -- we really just need a way for 
> > > > > > > > > > > > > > > > > > > brokers
> > > > to
> > > > > > > read
> > > > > > > > > > these
> > > > > > > > > > > > records
> > > > > > > > > > > > > > > > back.
> > > > > > > > > > > > > > > > > > > I'm wondering whether the new API could be
> > > > > > limited
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > fetch, and
> > > > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > > > > existing ETL pipelines could be more 
> > > > > > > > > > > > > > > > > > > easily
> > > > > > > leveraged.
> > > > > > > > > > For
> > > > > > > > > > > > example,
> > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > already have an ETL pipeline from Kafka to
> > > > HDFS,
> > > > > > > you
> > > > > > > > > > could
> > > > > > > > > > > > leave
> > > > > > > > > > > > > > > > that in
> > > > > > > > > > > > > > > > > > > place and just tell Kafka how to read 
> > > > > > > > > > > > > > > > > > > these
> > > > > > > > > > > records/segments
> > > > > > > > > > > > from
> > > > > > > > > > > > > > > > cold
> > > > > > > > > > > > > > > > > > > storage when necessary.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > - I'm wondering if we could just add 
> > > > > > > > > > > > > > > > > > > support
> > > > for
> > > > > > > > > loading
> > > > > > > > > > > > segments
> > > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > > > > remote URIs instead of from file, i.e. via
> > > > > > plugins
> > > > > > > for
> > > > > > > > > > > > s3://, hdfs://
> > > > > > > > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > > > > > I suspect less broker logic would change 
> > > > > > > > > > > > > > > > > > > in
> > > > that
> > > > > > > case
> > > > > > > > > --
> > > > > > > > > > > the
> > > > > > > > > > > > broker
> > > > > > > > > > > > > > > > > > > wouldn't necessarily care if it reads from
> > > > > > file://
> > > > > > > or
> > > > > > > > > > s3://
> > > > > > > > > > > > to load a
> > > > > > > > > > > > > > > > > > given
> > > > > > > > > > > > > > > > > > > segment.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Combining the previous two comments, I can
> > > > > > imagine
> > > > > > > a
> > > > > > > > > URI
> > > > > > > > > > > > resolution
> > > > > > > > > > > > > > > > chain
> > > > > > > > > > > > > > > > > > > for segments. For example, first try
> > > > > > > > > > > > > > > > file:///logs/{topic}/{segment}.log,
> > > > > > > > > > > > > > > > > > > then
> > > > s3://mybucket/{topic}/{date}/{segment}.log,
> > > > > > > etc,
> > > > > > > > > > > > leveraging your
> > > > > > > > > > > > > > > > > > > existing ETL pipeline(s).
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Ryanne
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 12:01 PM Harsha <
> > > > > > > > > ka...@harsha.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi All,
> > > > > > > > > > > > > > > > > > > > We are interested in adding tiered 
> > > > > > > > > > > > > > > > > > > > storage
> > > > to
> > > > > > > Kafka.
> > > > > > > > > > More
> > > > > > > > > > > > > > > > > > details
> > > > > > > > > > > > > > > > > > > > about motivation and design are in the
> > > > KIP. We
> > > > > > > are
> > > > > > > > > > > working
> > > > > > > > > > > > towards
> > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > initial POC. Any feedback or questions 
> > > > > > > > > > > > > > > > > > > > on
> > > > this
> > > > > > > KIP
> > > > > > > > > are
> > > > > > > > > > > > welcome.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > Harsha
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > >
> >

Reply via email to