Hi All,
           Thanks for your initial feedback. We updated the KIP. Please take a 
look and let us know if you have any questions.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage

Thanks,
Harsha

On Wed, Feb 6, 2019, at 10:30 AM, Harsha wrote:
> Thanks Eno, Adam & Satish for you review and questions. I'll address 
> these in KIP and update the thread here. 
> 
> Thanks,
> Harsha
> 
> On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana wrote:
> > Thanks, Harsha for the KIP. It is a good start for tiered storage in
> > Kafka. I have a few comments/questions.
> > 
> > It may be good to have a configuration to keep the number of local
> > segments instead of keeping only the active segment. This config can
> > be exposed at cluster and topic levels with default value as 1. In
> > some use cases, few consumers may lag over one segment, it will be
> > better to serve from local storage instead of remote storage.
> > 
> > It may be better to keep “remote.log.storage.enable” and respective
> > configuration at topic level along with cluster level. It will be
> > helpful in environments where few topics are configured with
> > local-storage and other topics are configured with remote storage.
> > 
> > Each topic-partition leader pushes its log segments with respective
> > index files to remote whenever active log rolls over, it updates the
> > remote log index file for the respective remote log segment. The
> > second option is to add offset index files also for each segment. It
> > can serve consumer fetch requests for old segments from local log
> > segment instead of serving directly from the remote log which may
> > cause high latencies. There can be different strategies in when the
> > remote segment is copied to a local segment.
> > 
> > What is “remote.log.manager.scheduler.interval.ms” config about?
> > 
> > How do followers sync RemoteLogSegmentIndex files? Do they request
> > from leader replica? This looks to be important as the failed over
> > leader should have RemoteLogSegmentIndex updated and ready to avoid
> > high latencies in serving old data stored in remote logs.
> > 
> > Thanks,
> > Satish.
> > 
> > On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
> > >
> > > Thanks Harsha, makes sense.
> > >
> > > Ryanne
> > >
> > > On Mon, Feb 4, 2019 at 5:53 PM Harsha Chintalapani <ka...@harsha.io> 
> > > wrote:
> > >
> > > > "I think you are saying that this enables additional (potentially 
> > > > cheaper)
> > > > storage options without *requiring* an existing ETL pipeline. “
> > > > Yes.
> > > >
> > > > " But it's not really a replacement for the sort of pipelines people 
> > > > build
> > > > with Connect, Gobblin etc.”
> > > >
> > > > It is not. But also making an assumption that everyone runs these
> > > > pipelines for storing raw Kafka data into HDFS or S3 is also wrong
> > > >  assumption.
> > > > The aim of this KIP is to provide tiered storage as whole package not
> > > > asking users to ship the data on their own using existing ETL, which 
> > > > means
> > > > running a consumer and maintaining those pipelines.
> > > >
> > > > " My point was that, if you are already offloading records in an ETL
> > > > pipeline, why do you need a new pipeline built into the broker to ship 
> > > > the
> > > > same data to the same place?”
> > > >
> > > > As you said its ETL pipeline, which means users of these pipelines are
> > > > reading the data from broker and transforming its state and storing it
> > > > somewhere.
> > > > The point of this KIP is store log segments as it is without changing
> > > > their structure so that we can use the existing offset mechanisms to 
> > > > look
> > > > it up when the consumer needs to read old data. When you do load it via
> > > > your existing pipelines you are reading the topic as a whole , which
> > > > doesn’t guarantee that you’ll produce this data back into HDFS in S3 in 
> > > > the
> > > > same order and who is going to generate the Index files again.
> > > >
> > > >
> > > > "So you'd end up with one of 1)cold segments are only useful to Kafka; 
> > > > 2)
> > > > you have the same data written to HDFS/etc twice, once for Kafka and 
> > > > once
> > > > for everything else, in two separate formats”
> > > >
> > > > You are talking two different use cases. If someone is storing raw data
> > > > out of Kafka for long term access.
> > > > By storing the data as it is in HDFS though Kafka will solve this issue.
> > > > They do not need to run another pipe-line to ship these logs.
> > > >
> > > > If they are running pipelines to store in HDFS in a different format,
> > > > thats a different use case. May be they are transforming Kafka logs to 
> > > > ORC
> > > > so that they can query through Hive.  Once you transform the log 
> > > > segment it
> > > > does loose its ability to use the existing offset index.
> > > > Main objective here not to change the existing protocol and still be 
> > > > able
> > > > to write and read logs from remote storage.
> > > >
> > > >
> > > > -Harsha
> > > >
> > > > On Feb 4, 2019, 2:53 PM -0800, Ryanne Dolan <ryannedo...@gmail.com>,
> > > > wrote:
> > > > > Thanks Harsha, makes sense for the most part.
> > > > >
> > > > > > tiered storage is to get away from this and make this transparent to
> > > > the
> > > > > user
> > > > >
> > > > > I think you are saying that this enables additional (potentially 
> > > > > cheaper)
> > > > > storage options without *requiring* an existing ETL pipeline. But it's
> > > > not
> > > > > really a replacement for the sort of pipelines people build with 
> > > > > Connect,
> > > > > Gobblin etc. My point was that, if you are already offloading records 
> > > > > in
> > > > an
> > > > > ETL pipeline, why do you need a new pipeline built into the broker to
> > > > ship
> > > > > the same data to the same place? I think in most cases this will be an
> > > > > additional pipeline, not a replacement, because the segments written 
> > > > > to
> > > > > cold storage won't be useful outside Kafka. So you'd end up with one 
> > > > > of
> > > > 1)
> > > > > cold segments are only useful to Kafka; 2) you have the same data 
> > > > > written
> > > > > to HDFS/etc twice, once for Kafka and once for everything else, in two
> > > > > separate formats; 3) you use your existing ETL pipeline and read cold
> > > > data
> > > > > directly.
> > > > >
> > > > > To me, an ideal solution would let me spool segments from Kafka to any
> > > > sink
> > > > > I would like, and then let Kafka clients seamlessly access that cold
> > > > data.
> > > > > Today I can do that in the client, but ideally the broker would do it 
> > > > > for
> > > > > me via some HDFS/Hive/S3 plugin. The KIP seems to accomplish that -- 
> > > > > just
> > > > > without leveraging anything I've currently got in place.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Mon, Feb 4, 2019 at 3:34 PM Harsha <ka...@harsha.io> wrote:
> > > > >
> > > > > > Hi Eric,
> > > > > > Thanks for your questions. Answers are in-line
> > > > > >
> > > > > > "The high-level design seems to indicate that all of the logic for
> > > > when and
> > > > > > how to copy log segments to remote storage lives in the RLM class. 
> > > > > > The
> > > > > > default implementation is then HDFS specific with additional
> > > > > > implementations being left to the community. This seems like it 
> > > > > > would
> > > > > > require anyone implementing a new RLM to also re-implement the logic
> > > > for
> > > > > > when to ship data to remote storage."
> > > > > >
> > > > > > RLM will be responsible for shipping log segments and it will decide
> > > > when
> > > > > > a log segment is ready to be shipped over.
> > > > > > Once a Log Segement(s) are identified as rolled over, RLM will 
> > > > > > delegate
> > > > > > this responsibility to a pluggable remote storage implementation.
> > > > Users who
> > > > > > are looking add their own implementation to enable other storages 
> > > > > > all
> > > > they
> > > > > > need to do is to implement the copy and read mechanisms and not to
> > > > > > re-implement RLM itself.
> > > > > >
> > > > > >
> > > > > > "Would it not be better for the Remote Log Manager implementation 
> > > > > > to be
> > > > > > non-configurable, and instead have an interface for the remote 
> > > > > > storage
> > > > > > layer? That way the "when" of the logic is consistent across all
> > > > > > implementations and it's only a matter of "how," similar to how the
> > > > Streams
> > > > > > StateStores are managed."
> > > > > >
> > > > > > It's possible that we can RLM non-configurable. But for the initial
> > > > > > release and to keep the backward compatibility
> > > > > > we want to make this configurable and for any users who might not be
> > > > > > interested in having the LogSegments shipped to remote, they don't
> > > > need to
> > > > > > worry about this.
> > > > > >
> > > > > >
> > > > > > Hi Ryanne,
> > > > > > Thanks for your questions.
> > > > > >
> > > > > > "How could this be used to leverage fast key-value stores, e.g.
> > > > Couchbase,
> > > > > > which can serve individual records but maybe not entire segments? Or
> > > > is the
> > > > > > idea to only support writing and fetching entire segments? Would it
> > > > make
> > > > > > sense to support both?"
> > > > > >
> > > > > > LogSegment once its rolled over are immutable objects and we want to
> > > > keep
> > > > > > the current structure of LogSegments and corresponding Index files. 
> > > > > > It
> > > > will
> > > > > > be easy to copy the whole segment as it is, instead of re-reading 
> > > > > > each
> > > > file
> > > > > > and use a key/value store.
> > > > > >
> > > > > > "
> > > > > > - Instead of defining a new interface and/or mechanism to ETL 
> > > > > > segment
> > > > files
> > > > > > from brokers to cold storage, can we just leverage Kafka itself? In
> > > > > > particular, we can already ETL records to HDFS via Kafka Connect,
> > > > Gobblin
> > > > > > etc -- we really just need a way for brokers to read these records
> > > > back.
> > > > > > I'm wondering whether the new API could be limited to the fetch, and
> > > > then
> > > > > > existing ETL pipelines could be more easily leveraged. For example, 
> > > > > > if
> > > > you
> > > > > > already have an ETL pipeline from Kafka to HDFS, you could leave 
> > > > > > that
> > > > in
> > > > > > place and just tell Kafka how to read these records/segments from 
> > > > > > cold
> > > > > > storage when necessary."
> > > > > >
> > > > > > This is pretty much what everyone does and it has the additional
> > > > overhead
> > > > > > of keeping these pipelines operating and monitoring.
> > > > > > What's proposed in the KIP is not ETL. It's just looking a the logs
> > > > that
> > > > > > are written and rolled over to copy the file as it is.
> > > > > > Each new topic needs to be added (sure we can do so via wildcard or
> > > > > > another mechanism) but new topics need to be onboard to ship the 
> > > > > > data
> > > > into
> > > > > > remote storage through a traditional ETL pipeline.
> > > > > > Once the data lands somewhere like HDFS/HIVE etc.. Users need to 
> > > > > > write
> > > > > > another processing line to re-process this data similar to how they 
> > > > > > are
> > > > > > doing it in their Stream processing pipelines. Tiered storage is to 
> > > > > > get
> > > > > > away from this and make this transparent to the user. They don't 
> > > > > > need
> > > > to
> > > > > > run another ETL process to ship the logs.
> > > > > >
> > > > > > "I'm wondering if we could just add support for loading segments 
> > > > > > from
> > > > > > remote URIs instead of from file, i.e. via plugins for s3://, 
> > > > > > hdfs://
> > > > etc.
> > > > > > I suspect less broker logic would change in that case -- the broker
> > > > > > wouldn't necessarily care if it reads from file:// or s3:// to load 
> > > > > > a
> > > > given
> > > > > > segment."
> > > > > >
> > > > > > Yes, this is what we are discussing in KIP. We are leaving the 
> > > > > > details
> > > > of
> > > > > > loading segments to RLM read part instead of directly exposing this 
> > > > > > in
> > > > the
> > > > > > Broker. This way we can keep the current Kafka code as it is without
> > > > > > changing the assumptions around the local disk. Let the RLM handle 
> > > > > > the
> > > > > > remote storage part.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Harsha
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 4, 2019, at 12:54 PM, Ryanne Dolan wrote:
> > > > > > > Harsha, Sriharsha, Suresh, a couple thoughts:
> > > > > > >
> > > > > > > - How could this be used to leverage fast key-value stores, e.g.
> > > > > > Couchbase,
> > > > > > > which can serve individual records but maybe not entire segments? 
> > > > > > > Or
> > > > is
> > > > > > the
> > > > > > > idea to only support writing and fetching entire segments? Would 
> > > > > > > it
> > > > make
> > > > > > > sense to support both?
> > > > > > >
> > > > > > > - Instead of defining a new interface and/or mechanism to ETL 
> > > > > > > segment
> > > > > > files
> > > > > > > from brokers to cold storage, can we just leverage Kafka itself? 
> > > > > > > In
> > > > > > > particular, we can already ETL records to HDFS via Kafka Connect,
> > > > Gobblin
> > > > > > > etc -- we really just need a way for brokers to read these records
> > > > back.
> > > > > > > I'm wondering whether the new API could be limited to the fetch, 
> > > > > > > and
> > > > then
> > > > > > > existing ETL pipelines could be more easily leveraged. For 
> > > > > > > example,
> > > > if
> > > > > > you
> > > > > > > already have an ETL pipeline from Kafka to HDFS, you could leave
> > > > that in
> > > > > > > place and just tell Kafka how to read these records/segments from
> > > > cold
> > > > > > > storage when necessary.
> > > > > > >
> > > > > > > - I'm wondering if we could just add support for loading segments
> > > > from
> > > > > > > remote URIs instead of from file, i.e. via plugins for s3://, 
> > > > > > > hdfs://
> > > > > > etc.
> > > > > > > I suspect less broker logic would change in that case -- the 
> > > > > > > broker
> > > > > > > wouldn't necessarily care if it reads from file:// or s3:// to 
> > > > > > > load a
> > > > > > given
> > > > > > > segment.
> > > > > > >
> > > > > > > Combining the previous two comments, I can imagine a URI 
> > > > > > > resolution
> > > > chain
> > > > > > > for segments. For example, first try
> > > > file:///logs/{topic}/{segment}.log,
> > > > > > > then s3://mybucket/{topic}/{date}/{segment}.log, etc, leveraging 
> > > > > > > your
> > > > > > > existing ETL pipeline(s).
> > > > > > >
> > > > > > > Ryanne
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 4, 2019 at 12:01 PM Harsha <ka...@harsha.io> wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > > We are interested in adding tiered storage to Kafka. More
> > > > > > details
> > > > > > > > about motivation and design are in the KIP. We are working 
> > > > > > > > towards
> > > > an
> > > > > > > > initial POC. Any feedback or questions on this KIP are welcome.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Harsha
> > > > > > > >
> > > > > >
> > > >
>

Reply via email to