Ryanne, I think I can answer the first question: > - How is this better than using a cached remote file system, This KIP brings in another storage tier (for a total of 3: memory --- not strictly a tier, more of a cache, local storage, remote storage) and manages the movement of the data between tiers. Using a cached remote file system would remove 1 tier (the local storage one). So this KIP gives us more options on where to store the data in the end.
Eno On Thu, Oct 24, 2019 at 4:46 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > > A few more questions: > > - How is this better than using a cached remote file system, e.g. mounting > HDFS or S3 (yes, it's possible) and letting the OS and drivers handle > everything? Maybe it's better, but the KIP doesn't address how or why, and > I'd think this would be a trivial benchmark. If, for some reason, mounting > and writing directly to a remote store is approximately as performant, it > would be hard to argue for this KIP. I wouldn't be surprised if this were > the case. > > - Why wait until local segments expire before offloading them to cold > storage? Why not stream to HDFS/S3 on an ongoing basis? I'd think this > would reduce bursty behavior from periodic uploads. > > - Can we write to multiple remote stores at the same time? Can we have some > topics go to S3 and some to HDFS? Since we're storing "RDIs" that point to > remote locations, can we generalize this to full URIs that may be in any > supported remote store? In particular, what happens when you want to switch > from HDFS to S3 -- can we add a new plugin and keep going? Can we fetch > s3:/// URIs from S3 and hdfs:/// URIs from HDFS? > > - Instead of having brokers do all this, what if we just expose an API that > lets external tooling register a URI for a given segment? If I've copied a > segment file to S3, say with a daily cron job, why not just tell Kafka > where to find it? Assuming I've got a plugin to _read_ from S3, that's all > Kafka would need to know. > > Ryanne > > On Thu, Oct 24, 2019, 9:13 AM Eno Thereska <eno.there...@gmail.com> wrote: > > > Going back to initial thread with general questions on KIP. I think > > aspects of the user experience still need clarification: > > > > - if a user has a mix of compacted and non-compacted topics it will be > > hard to reason about storage needs overall. Could you give a reason > > why compacted topics are not supported? This is probably because to do > > that you'd have to go with a paging approach (like Ryanne earlier > > suggested) and that will be expensive in terms of IO. Do you want to > > discount supporting compacted topics this early in the KIP design or > > do you want to leave open the option of supporting them eventually? In > > an ideal system, Kafka figures out if the topic is compacted or not > > and for non-compacted topics it doesn't do the local copy so it goes > > through a fast path. > > > > - why do we need per topic remote retention time and bytes? Why isn't > > per topic retention time and bytes (without the "remote" part) > > sufficient? E.g., if I have a topic and I want retention bytes to be > > 1TB, and I currently have 500GB local and 500GB remote, Kafka can > > manage what segments get deleted first. This would avoid the user > > needing to think even more about these extra configs. > > > > Thanks > > Eno > > > > > > On Mon, Oct 21, 2019 at 4:46 PM Harsha <ka...@harsha.io> wrote: > > > > > > Hi All, > > > Thanks for the initial feedback on the KIP-405. We opened a > > PR here https://github.com/apache/kafka/pull/7561 . > > > Please take a look and let us know if you have any questions. > > > Since this feature is being developed by engineers from different > > companies we would like to open a feature branch in apache kafka git. It > > will allow us collaborate in open source community rather than in private > > branches. Please let me know if you have any objections to opening a > > feature branch in kafka's git repo. > > > > > > Thanks, > > > Harsha > > > > > > On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote: > > > > Thanks, Ron. Updating the KIP. will add answers here as well > > > > > > > > 1) If the cold storage technology can be cross-region, is there a > > > > possibility for a disaster recovery Kafka cluster to share the > > messages in > > > > cold storage? My guess is the answer is no, and messages replicated > > to the > > > > D/R cluster have to be migrated to cold storage from there > > independently. > > > > (The same cross-region cold storage medium could be used, but every > > message > > > > would appear there twice). > > > > > > > > If I understand the question correctly, what you are saying is Kafka A > > > > cluster (active) shipping logs to remote storage which cross-region > > > > replication and another Kafka Cluster B (Passive) will it be able to > > > > use the remote storage copied logs directly. > > > > For the initial version my answer is No. We can handle this in > > > > subsequent changes after this one. > > > > > > > > 2) Can/should external (non-Kafka) tools have direct access to the > > messages > > > > in cold storage. I think this might have been addressed when someone > > asked > > > > about ACLs, and I believe the answer is "no" -- if some external tool > > needs > > > > to operate on that data then that external tool should read that data > > by > > > > acting as a Kafka consumer. Again, just asking to get the answer > > clearly > > > > documented in case it is unclear. > > > > > > > > The answer is No. All tools/clients must go through broker APIs to > > > > access any data (local or remote). > > > > Only Kafka broker user will have access to remote storage logs and > > > > Security/ACLs will work the way it does today. > > > > Tools/Clients going directly to the remote storage might help in terms > > > > of efficiency but this requires Protocol changes and some way of > > > > syncing ACLs in Kafka to the Remote storage. > > > > > > > > > > > > Thanks, > > > > Harsha > > > > > > > > On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote: > > > > > Hi Harsha. A couple of questions. I think I know the answers, but > > it > > > > > would be good to see them explicitly documented. > > > > > > > > > > 1) If the cold storage technology can be cross-region, is there a > > > > > possibility for a disaster recovery Kafka cluster to share the > > messages in > > > > > cold storage? My guess is the answer is no, and messages replicated > > to the > > > > > D/R cluster have to be migrated to cold storage from there > > independently. > > > > > (The same cross-region cold storage medium could be used, but every > > message > > > > > would appear there twice). > > > > > > > > > > 2) Can/should external (non-Kafka) tools have direct access to the > > messages > > > > > in cold storage. I think this might have been addressed when > > someone asked > > > > > about ACLs, and I believe the answer is "no" -- if some external > > tool needs > > > > > to operate on that data then that external tool should read that > > data by > > > > > acting as a Kafka consumer. Again, just asking to get the answer > > clearly > > > > > documented in case it is unclear. > > > > > > > > > > Ron > > > > > > > > > > > > > > > On Thu, Apr 4, 2019 at 12:53 AM Harsha <ka...@harsha.io> wrote: > > > > > > > > > > > Hi Viktor, > > > > > > > > > > > > > > > > > > "Now, will the consumer be able to consume a remote segment if: > > > > > > - the remote segment is stored in the remote storage, BUT > > > > > > - the leader broker failed right after this AND > > > > > > - the follower which is to become a leader didn't scan yet for a > > new > > > > > > segment?" > > > > > > > > > > > > If I understand correctly, after a local log segment copied to > > remote and > > > > > > leader is failed to write the index files and leadership changed > > to a > > > > > > follower. In this case we consider the log segment copy failed and > > newly > > > > > > elected leader will start copying the data from last the known > > offset in > > > > > > the remote to copy. Consumers who are looking for the offset > > which might > > > > > > be in the failed copy log segment will continue to be read the > > data from > > > > > > local disk since the local log segment will only be deleted once a > > > > > > successful copy of the log segment. > > > > > > > > > > > > "As a follow-up question, what are your experiences, does a > > failover in a > > > > > > broker causes bigger than usual churn in the consumers? (I'm > > thinking about > > > > > > the time required to rebuild remote index files.)" > > > > > > > > > > > > Rebuild remote index files will only happen in case of remote > > storage > > > > > > missing all the copied index files. Fail-over will not trigger > > this > > > > > > rebuild. > > > > > > > > > > > > > > > > > > Hi Ryan, > > > > > > > > > > > > "Harsha, can you comment on this alternative approach: instead of > > fetching > > > > > > directly from remote storage via a new API, implement something > > like > > > > > > paging, where segments are paged-in and out of cold storage based > > on access > > > > > > frequency/recency? For example, when a remote segment is accessed, > > it could > > > > > > be first fetched to disk and then read from there. I suppose this > > would > > > > > > require less code changes, or at least less API changes." > > > > > > > > > > > > Copying whole log segment from remote is inefficient. When tiered > > storage > > > > > > is enabled users might prefer hardware with smaller disks and > > having to > > > > > > copy the log segment to local disk again , especially incase of > > multiple > > > > > > consumers on multiple topics triggering this might negatively > > affect the > > > > > > available local storage. > > > > > > What we proposed in the KIP doesn't affect the existing APIs and > > we didn't > > > > > > call for any API changes. > > > > > > > > > > > > "And related to paging, does the proposal address what happens > > when a > > > > > > broker > > > > > > runs out of HDD space? Maybe we should have a way to configure a > > max number > > > > > > of segments or bytes stored on each broker, after which older or > > > > > > least-recently-used segments are kicked out, even if they aren't > > expired > > > > > > per the retention policy? Otherwise, I suppose tiered storage > > requires some > > > > > > babysitting to ensure that brokers don't run out of local storage, > > despite > > > > > > having access to potentially unbounded cold storage." > > > > > > > > > > > > Existing Kafka behavior will not change with addition of tiered > > storage > > > > > > and enabling it also will not change behavior. > > > > > > Just like today it's up to the operator to make sure the HD space > > is > > > > > > monitored and take necessary actions to mitigate that before it > > becomes > > > > > > fatal failure for broker. We don't stop users to configure the > > retention > > > > > > period to infinite and they can easily run out of the space. > > > > > > > > > > > > These are not the alternatives considered as they are not > > efficient copy > > > > > > in out of local disk , hence the reason we didn't add to > > alternatives > > > > > > considered :). > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 3, 2019, at 7:51 AM, Ryanne Dolan wrote: > > > > > > > Harsha, can you comment on this alternative approach: instead of > > fetching > > > > > > > directly from remote storage via a new API, implement something > > like > > > > > > > paging, where segments are paged-in and out of cold storage > > based on > > > > > > access > > > > > > > frequency/recency? For example, when a remote segment is > > accessed, it > > > > > > could > > > > > > > be first fetched to disk and then read from there. I suppose > > this would > > > > > > > require less code changes, or at least less API changes. > > > > > > > > > > > > > > And related to paging, does the proposal address what happens > > when a > > > > > > broker > > > > > > > runs out of HDD space? Maybe we should have a way to configure a > > max > > > > > > number > > > > > > > of segments or bytes stored on each broker, after which older or > > > > > > > least-recently-used segments are kicked out, even if they aren't > > expired > > > > > > > per the retention policy? Otherwise, I suppose tiered storage > > requires > > > > > > some > > > > > > > babysitting to ensure that brokers don't run out of local > > storage, > > > > > > despite > > > > > > > having access to potentially unbounded cold storage. > > > > > > > > > > > > > > Just some things to add to Alternatives Considered :) > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > On Wed, Apr 3, 2019 at 8:21 AM Viktor Somogyi-Vass < > > > > > > viktorsomo...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Harsha, > > > > > > > > > > > > > > > > Thanks for the answer, makes sense. > > > > > > > > In the meantime one edge case popped up in my mind but first > > let me > > > > > > > > summarize what I understand if I interpret your KIP correctly. > > > > > > > > > > > > > > > > So basically whenever the leader RSM copies over a segment to > > the > > > > > > remote > > > > > > > > storage, the leader RLM will append an entry to its remote > > index files > > > > > > with > > > > > > > > the remote position. After this LogManager can delete the local > > > > > > segment. > > > > > > > > Parallel to this RLM followers are periodically scanning the > > remote > > > > > > storage > > > > > > > > for files and if they find a new one they update their indices. > > > > > > > > > > > > > > > > Now, will the consumer be able to consume a remote segment if: > > > > > > > > - the remote segment is stored in the remote storage, BUT > > > > > > > > - the leader broker failed right after this AND > > > > > > > > - the follower which is to become a leader didn't scan yet for > > a new > > > > > > > > segment? > > > > > > > > Would this result in an OffsetOutOfRangeException or would the > > failover > > > > > > > > halt the consume request until the new leader has the latest > > > > > > information? > > > > > > > > As a follow-up question, what are your experiences, does a > > failover in > > > > > > a > > > > > > > > broker causes bigger than usual churn in the consumers? (I'm > > thinking > > > > > > about > > > > > > > > the time required to rebuild remote index files.) > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Viktor > > > > > > > > > > > > > > > > On Mon, Apr 1, 2019 at 8:49 PM Harsha <ka...@harsha.io> wrote: > > > > > > > > > > > > > > > > > Hi Eno, > > > > > > > > > > > > > > > > > > Thanks for the comments. Answers are inline > > > > > > > > > > > > > > > > > > "Performance & durability > > > > > > > > > ---------------------------------- > > > > > > > > > - would be good to have more discussion on performance > > implications > > > > > > of > > > > > > > > > tiering. Copying the data from the local storage to the > > remote > > > > > > storage is > > > > > > > > > going to be expensive in terms of network bandwidth and will > > affect > > > > > > > > > foreground traffic to Kafka potentially reducing its > > throughput and > > > > > > > > > latency." > > > > > > > > > > > > > > > > > > Good point. We've run our local tests with 10GigE cards, > > even though > > > > > > our > > > > > > > > > clients bandwidth requirements are high with 1000s of clients > > > > > > producing / > > > > > > > > > consuming data we never hit hit our limits on network > > bandwidth. More > > > > > > > > often > > > > > > > > > we hit limits of CPU, Mem limits than the network bandwidth. > > But > > > > > > this is > > > > > > > > > something to be taken care of by the operator if they want > > to enable > > > > > > > > tiered > > > > > > > > > storage. > > > > > > > > > Also as mentioned in the KIP/previous threads ,clients > > requesting > > > > > > older > > > > > > > > > data is very rare and often used as insurance policy . What > > proposed > > > > > > here > > > > > > > > > does increase bandwidth interms of shipping logsegments to > > remote but > > > > > > > > > access patterns determines how much we end up reading from > > remote > > > > > > tier. > > > > > > > > > > > > > > > > > > > > > > > > > > > "- throttling the copying of the data above might be a > > solution, > > > > > > however, > > > > > > > > > if > > > > > > > > > you have a few TB of data to move to the slower remote tier > > the risk > > > > > > is > > > > > > > > > that the movement will never complete on time under high > > Kafka load. > > > > > > Do > > > > > > > > we > > > > > > > > > need a scheduler to use idle time to do the copying?" > > > > > > > > > > > > > > > > > > In our design, we are going to have scheduler in RLM which > > will > > > > > > > > > periodically copy in-active(rolled-over) log segments. > > > > > > > > > Not sure idle time is easy to calculate and schedule a copy. > > More > > > > > > over we > > > > > > > > > want to copy the segments as soon as they are available. > > > > > > > > > Throttling something we can take into account and provide > > options to > > > > > > tune > > > > > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > > > > "- Have you considered having two options: 1) a slow tier > > only > > > > > > (e.g., all > > > > > > > > > the data on HDFS) and 2) a fast tier only like Kafka today. > > This > > > > > > would > > > > > > > > > avoid copying data between the tiers. Customers that can > > tolerate a > > > > > > > > slower > > > > > > > > > tier with a better price/GB can just choose option (1). > > Would be > > > > > > good to > > > > > > > > > put in Alternatives considered." > > > > > > > > > > > > > > > > > > What we want to have is Kafka that is known to the users > > today with > > > > > > > > local > > > > > > > > > fast disk access and fast data serving layer. Tiered > > Storage option > > > > > > > > might > > > > > > > > > not be for everyone and most users who are happy with Kafka > > today > > > > > > > > shouldn't > > > > > > > > > see changes to their operation because of this KIP. > > > > > > > > > > > > > > > > > > Fundamentally, we believe remote tiered storage data > > accessed very > > > > > > > > > infrequently. We expect anyone going to read from remote > > tiered > > > > > > storage > > > > > > > > > expects a slower read response (mostly backfills). > > > > > > > > > > > > > > > > > > Making an explicit change like slow/fast tier will only > > cause more > > > > > > > > > confusion and operation complexity that will bring into > > play. With > > > > > > tiered > > > > > > > > > storage , only users who want to use cheaper long-term > > storage can > > > > > > enable > > > > > > > > > it and others can operate the Kafka as its today. It will > > give a > > > > > > good > > > > > > > > > balance of serving latest reads from local disk almost all > > the time > > > > > > and > > > > > > > > > shipping older data and reading from remote tier when > > clients needs > > > > > > the > > > > > > > > > older data. If necessary, we can re-visit slow/fast-tier > > options at a > > > > > > > > later > > > > > > > > > point. > > > > > > > > > > > > > > > > > > > > > > > > > > > "Topic configs > > > > > > > > > ------------------ > > > > > > > > > - related to performance but also availability, we need to > > discuss > > > > > > the > > > > > > > > > replication mode for the remote tier. For example, if the > > Kafka > > > > > > topics > > > > > > > > used > > > > > > > > > to have 3-way replication, will they continue to have 3-way > > > > > > replication > > > > > > > > on > > > > > > > > > the remote tier? Will the user configure that replication? > > In S3 for > > > > > > > > > example, one can choose from different S3 tiers like STD or > > SIA, but > > > > > > > > there > > > > > > > > > is no direct control over the replication factor like in > > Kafka." > > > > > > > > > > > > > > > > > > No. Remote tier is expected to be reliable storage with its > > own > > > > > > > > > replication mechanisms. > > > > > > > > > > > > > > > > > > > > > > > > > > > " how will security and ACLs be configured for the remote > > tier. > > > > > > E.g., if > > > > > > > > > user A does not have access to a Kafka topic, when that > > topic is > > > > > > moved to > > > > > > > > > S3 or HDFS there needs to be a way to prevent access to the > > S3 > > > > > > bucket for > > > > > > > > > that user. This might be outside the scope of this KIP but > > would be > > > > > > good > > > > > > > > to > > > > > > > > > discuss first." > > > > > > > > > > > > > > > > > > As mentioned in the KIP "Alternatives" section We will keep > > the > > > > > > Kafka as > > > > > > > > > the owner of those files in S3 or HDFS and take advantage of > > HDFS > > > > > > > > security > > > > > > > > > model (file system permissions). So any user who wants to go > > > > > > directly and > > > > > > > > > access files from HDFS will not be able to read them and any > > client > > > > > > > > > requests will go through Kafka and its ACLs will apply like > > it does > > > > > > for > > > > > > > > any > > > > > > > > > other request. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ron, > > > > > > > > > Thanks for the comments. > > > > > > > > > > > > > > > > > > " I'm excited about this potential feature. Did you consider > > > > > > > > > storing the information about the remote segments in a Kafka > > topic as > > > > > > > > > opposed to in the remote storage itself? The topic would > > need > > > > > > infinite > > > > > > > > > retention (or it would need to be compacted) so as not to > > itself be > > > > > > sent > > > > > > > > to > > > > > > > > > cold storage, but assuming that topic would fit on local > > disk for all > > > > > > > > time > > > > > > > > > (an open question as to whether this is acceptable or not) > > it feels > > > > > > like > > > > > > > > > the most natural way to communicate information among > > brokers -- more > > > > > > > > > natural than having them poll the remote storage systems, at > > least." > > > > > > > > > > > > > > > > > > With RemoteIndex we are extending the current index > > mechanism to > > > > > > find a > > > > > > > > > offset and its message to find a file in remote storage for > > a givent > > > > > > > > > offset. This will be optimal way finding for a given offset > > which > > > > > > remote > > > > > > > > > segment might be serving compare to storing all of this data > > into > > > > > > > > internal > > > > > > > > > topic. > > > > > > > > > > > > > > > > > > "To add to Eric's question/confusion about where logic lives > > (RLM vs. > > > > > > > > RSM), > > > > > > > > > I think it would be helpful to explicitly identify in the > > KIP that > > > > > > the > > > > > > > > RLM > > > > > > > > > delegates to the RSM since the RSM is part of the public API > > and is > > > > > > the > > > > > > > > > pluggable piece. For example, instead of saying "RLM will > > ship the > > > > > > log > > > > > > > > > segment files that are older than a configurable time to > > remote > > > > > > storage" > > > > > > > > I > > > > > > > > > think it would be better to say "RLM identifies log segment > > files > > > > > > that > > > > > > > > are > > > > > > > > > older than a configurable time and delegates to the > > configured RSM to > > > > > > > > ship > > > > > > > > > them to remote storage" (or something like that -- just make > > it clear > > > > > > > > that > > > > > > > > > the RLM is delegating to the configured RSM)." > > > > > > > > > > > > > > > > > > Thanks. I agree with you. I'll update the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ambud, > > > > > > > > > > > > > > > > > > Thanks for the comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > "1. Wouldn't implicit checking for old offsets in remote > > location if > > > > > > not > > > > > > > > > found locally on the leader i.e. do we really need remote > > index > > > > > > files? > > > > > > > > > Since the storage path for a given topic would presumably be > > constant > > > > > > > > > across all the brokers, the remote topic-partition path > > could simply > > > > > > be > > > > > > > > > checked to see if there are any segment file names that > > would meet > > > > > > the > > > > > > > > > offset requirements for a Consumer Fetch Request. RSM > > implementations > > > > > > > > could > > > > > > > > > optionally cache this information." > > > > > > > > > > > > > > > > > > By storing the remote index files locally , it will be > > faster for us > > > > > > to > > > > > > > > > determine for a requested offset which file might contain > > the data. > > > > > > This > > > > > > > > > will help us resolve the remote file quickly and return the > > response. > > > > > > > > > Instead of making a call to remote tier for index look up. > > Given > > > > > > index > > > > > > > > > files are smaller , it won't be much hit to the storage > > space. > > > > > > > > > > > > > > > > > > > > > > > > > > > "2. Would it make sense to create an internal compacted > > Kafka topic > > > > > > to > > > > > > > > > publish & record remote segment information? This would > > enable the > > > > > > > > > followers to get updates about new segments rather than > > running > > > > > > list() > > > > > > > > > operations on remote storage to detect new segments which > > may be > > > > > > > > > expensive." > > > > > > > > > > > > > > > > > > > > > > > > > > > I think Ron also alluding to this. We thought shipping > > remote index > > > > > > files > > > > > > > > > to remote storage files and let the follower's RLM picking > > that up > > > > > > makes > > > > > > > > it > > > > > > > > > easy to have the current replication protocol without any > > changes. > > > > > > So we > > > > > > > > > don't determine if a follower is in ISR or not based on > > another > > > > > > topic's > > > > > > > > > replication. We will run small tests and determine if use > > of topic > > > > > > is > > > > > > > > > better for this. Thanks for the suggestion. > > > > > > > > > > > > > > > > > > 3. For RLM to scan local segment rotations are you thinking > > of > > > > > > leveraging > > > > > > > > > java.nio.file.WatchService or simply running listFiles() on a > > > > > > periodic > > > > > > > > > basis? Since WatchService implementation is heavily OS > > dependent it > > > > > > might > > > > > > > > > create some complications around missing FS Events. > > > > > > > > > > > > > > > > > > Ideally we want to introduce file events like you suggested. > > For POC > > > > > > work > > > > > > > > > we are using just listFiles(). Also copying these files to > > remote > > > > > > can be > > > > > > > > > slower and we will not delete the files from local disk > > until the > > > > > > segment > > > > > > > > > is copied and any requests to the data in these files will > > be served > > > > > > from > > > > > > > > > local disk. So I don't think we need to be aggressive and > > optimize > > > > > > the > > > > > > > > this > > > > > > > > > copy segment to remote path. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Viktor, > > > > > > > > > Thanks for the comments. > > > > > > > > > > > > > > > > > > "I have a rather technical question to this. How do you plan > > to > > > > > > package > > > > > > > > > this > > > > > > > > > extension? Does this mean that Kafka will depend on HDFS? > > > > > > > > > I think it'd be nice to somehow separate this off to a > > different > > > > > > package > > > > > > > > in > > > > > > > > > the project so that it could be built and released > > separately from > > > > > > the > > > > > > > > main > > > > > > > > > Kafka packages." > > > > > > > > > > > > > > > > > > We would like all of this code to be part of Apache Kafka . > > In early > > > > > > days > > > > > > > > > of Kafka, there is external module which used to contain > > kafka to > > > > > > hdfs > > > > > > > > copy > > > > > > > > > tools and dependencies. We would like to have RLM (class > > > > > > implementation) > > > > > > > > > and RSM(interface) to be in core and as you suggested, > > > > > > implementation of > > > > > > > > > RSM could be in another package so that the dependencies of > > RSM won't > > > > > > > > come > > > > > > > > > into Kafka's classpath unless someone explicity configures > > them. > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 1, 2019, at 1:02 AM, Viktor Somogyi-Vass wrote: > > > > > > > > > > Hey Harsha, > > > > > > > > > > > > > > > > > > > > I have a rather technical question to this. How do you > > plan to > > > > > > package > > > > > > > > > this > > > > > > > > > > extension? Does this mean that Kafka will depend on HDFS? > > > > > > > > > > I think it'd be nice to somehow separate this off to a > > different > > > > > > > > package > > > > > > > > > in > > > > > > > > > > the project so that it could be built and released > > separately from > > > > > > the > > > > > > > > > main > > > > > > > > > > Kafka packages. > > > > > > > > > > This decoupling would be useful when direct dependency on > > HDFS (or > > > > > > > > other > > > > > > > > > > implementations) is not needed and would also encourage > > decoupling > > > > > > for > > > > > > > > > > other storage implementations. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Viktor > > > > > > > > > > > > > > > > > > > > On Mon, Apr 1, 2019 at 3:44 AM Ambud Sharma < > > > > > > asharma52...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Harsha, > > > > > > > > > > > > > > > > > > > > > > Thank you for proposing this KIP. We are looking forward > > to this > > > > > > > > > feature as > > > > > > > > > > > well. > > > > > > > > > > > > > > > > > > > > > > A few questions around the design & implementation: > > > > > > > > > > > > > > > > > > > > > > 1. Wouldn't implicit checking for old offsets in remote > > location > > > > > > if > > > > > > > > not > > > > > > > > > > > found locally on the leader i.e. do we really need > > remote index > > > > > > > > files? > > > > > > > > > > > Since the storage path for a given topic would > > presumably be > > > > > > constant > > > > > > > > > > > across all the brokers, the remote topic-partition path > > could > > > > > > simply > > > > > > > > be > > > > > > > > > > > checked to see if there are any segment file names that > > would > > > > > > meet > > > > > > > > the > > > > > > > > > > > offset requirements for a Consumer Fetch Request. RSM > > > > > > implementations > > > > > > > > > could > > > > > > > > > > > optionally cache this information. > > > > > > > > > > > > > > > > > > > > > > 2. Would it make sense to create an internal compacted > > Kafka > > > > > > topic to > > > > > > > > > > > publish & record remote segment information? This would > > enable > > > > > > the > > > > > > > > > > > followers to get updates about new segments rather than > > running > > > > > > > > list() > > > > > > > > > > > operations on remote storage to detect new segments > > which may be > > > > > > > > > expensive. > > > > > > > > > > > > > > > > > > > > > > 3. For RLM to scan local segment rotations are you > > thinking of > > > > > > > > > leveraging > > > > > > > > > > > java.nio.file.WatchService or simply running listFiles() > > on a > > > > > > > > periodic > > > > > > > > > > > basis? Since WatchService implementation is heavily OS > > dependent > > > > > > it > > > > > > > > > might > > > > > > > > > > > create some complications around missing FS Events. > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > Ambud > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 28, 2019 at 8:04 AM Ron Dagostino < > > rndg...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Harsha. I'm excited about this potential feature. > > Did you > > > > > > > > > consider > > > > > > > > > > > > storing the information about the remote segments in a > > Kafka > > > > > > topic > > > > > > > > as > > > > > > > > > > > > opposed to in the remote storage itself? The topic > > would need > > > > > > > > > infinite > > > > > > > > > > > > retention (or it would need to be compacted) so as not > > to > > > > > > itself be > > > > > > > > > sent > > > > > > > > > > > to > > > > > > > > > > > > cold storage, but assuming that topic would fit on > > local disk > > > > > > for > > > > > > > > all > > > > > > > > > > > time > > > > > > > > > > > > (an open question as to whether this is acceptable or > > not) it > > > > > > feels > > > > > > > > > like > > > > > > > > > > > > the most natural way to communicate information among > > brokers > > > > > > -- > > > > > > > > more > > > > > > > > > > > > natural than having them poll the remote storage > > systems, at > > > > > > least. > > > > > > > > > > > > > > > > > > > > > > > > To add to Eric's question/confusion about where logic > > lives > > > > > > (RLM > > > > > > > > vs. > > > > > > > > > > > RSM), > > > > > > > > > > > > I think it would be helpful to explicitly identify in > > the KIP > > > > > > that > > > > > > > > > the > > > > > > > > > > > RLM > > > > > > > > > > > > delegates to the RSM since the RSM is part of the > > public API > > > > > > and is > > > > > > > > > the > > > > > > > > > > > > pluggable piece. For example, instead of saying "RLM > > will > > > > > > ship the > > > > > > > > > log > > > > > > > > > > > > segment files that are older than a configurable time > > to remote > > > > > > > > > storage" > > > > > > > > > > > I > > > > > > > > > > > > think it would be better to say "RLM identifies log > > segment > > > > > > files > > > > > > > > > that > > > > > > > > > > > are > > > > > > > > > > > > older than a configurable time and delegates to the > > configured > > > > > > RSM > > > > > > > > to > > > > > > > > > > > ship > > > > > > > > > > > > them to remote storage" (or something like that -- > > just make it > > > > > > > > clear > > > > > > > > > > > that > > > > > > > > > > > > the RLM is delegating to the configured RSM). > > > > > > > > > > > > > > > > > > > > > > > > Ron > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 28, 2019 at 6:12 AM Eno Thereska < > > > > > > > > eno.there...@gmail.com > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Harsha, > > > > > > > > > > > > > > > > > > > > > > > > > > A couple of comments: > > > > > > > > > > > > > > > > > > > > > > > > > > Performance & durability > > > > > > > > > > > > > ---------------------------------- > > > > > > > > > > > > > - would be good to have more discussion on > > performance > > > > > > > > > implications of > > > > > > > > > > > > > tiering. Copying the data from the local storage to > > the > > > > > > remote > > > > > > > > > storage > > > > > > > > > > > is > > > > > > > > > > > > > going to be expensive in terms of network bandwidth > > and will > > > > > > > > affect > > > > > > > > > > > > > foreground traffic to Kafka potentially reducing its > > > > > > throughput > > > > > > > > and > > > > > > > > > > > > > latency. > > > > > > > > > > > > > - throttling the copying of the data above might be a > > > > > > solution, > > > > > > > > > however > > > > > > > > > > > > if > > > > > > > > > > > > > you have a few TB of data to move to the slower > > remote tier > > > > > > the > > > > > > > > > risk is > > > > > > > > > > > > > that the movement will never complete on time under > > high > > > > > > Kafka > > > > > > > > > load. Do > > > > > > > > > > > > we > > > > > > > > > > > > > need a scheduler to use idle time to do the copying? > > > > > > > > > > > > > - Have you considered having two options: 1) a slow > > tier only > > > > > > > > > (e.g., > > > > > > > > > > > all > > > > > > > > > > > > > the data on HDFS) and 2) a fast tier only like Kafka > > today. > > > > > > This > > > > > > > > > would > > > > > > > > > > > > > avoid copying data between the tiers. Customers that > > can > > > > > > > > tolerate a > > > > > > > > > > > > slower > > > > > > > > > > > > > tier with a better price/GB can just choose option > > (1). > > > > > > Would be > > > > > > > > > good > > > > > > > > > > > to > > > > > > > > > > > > > put in Alternatives considered. > > > > > > > > > > > > > > > > > > > > > > > > > > Topic configs > > > > > > > > > > > > > ------------------ > > > > > > > > > > > > > - related to performance but also availability, we > > need to > > > > > > > > discuss > > > > > > > > > the > > > > > > > > > > > > > replication mode for the remote tier. For example, > > if the > > > > > > Kafka > > > > > > > > > topics > > > > > > > > > > > > used > > > > > > > > > > > > > to have 3-way replication, will they continue to > > have 3-way > > > > > > > > > replication > > > > > > > > > > > > on > > > > > > > > > > > > > the remote tier? Will the user configure that > > replication? > > > > > > In S3 > > > > > > > > > for > > > > > > > > > > > > > example, one can choose from different S3 tiers like > > STD or > > > > > > SIA, > > > > > > > > > but > > > > > > > > > > > > there > > > > > > > > > > > > > is no direct control over the replication factor > > like in > > > > > > Kafka. > > > > > > > > > > > > > - how will security and ACLs be configured for the > > remote > > > > > > tier. > > > > > > > > > E.g., > > > > > > > > > > > if > > > > > > > > > > > > > user A does not have access to a Kafka topic, when > > that > > > > > > topic is > > > > > > > > > moved > > > > > > > > > > > to > > > > > > > > > > > > > S3 or HDFS there needs to be a way to prevent access > > to the > > > > > > S3 > > > > > > > > > bucket > > > > > > > > > > > for > > > > > > > > > > > > > that user. This might be outside the scope of this > > KIP but > > > > > > would > > > > > > > > be > > > > > > > > > > > good > > > > > > > > > > > > to > > > > > > > > > > > > > discuss first. > > > > > > > > > > > > > > > > > > > > > > > > > > That's it for now, thanks > > > > > > > > > > > > > Eno > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 27, 2019 at 4:40 PM Harsha < > > ka...@harsha.io> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > Thanks for your initial feedback. We > > updated the > > > > > > > > KIP. > > > > > > > > > > > Please > > > > > > > > > > > > > > take a look and let us know if you have any > > questions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 6, 2019, at 10:30 AM, Harsha wrote: > > > > > > > > > > > > > > > Thanks Eno, Adam & Satish for you review and > > questions. > > > > > > I'll > > > > > > > > > > > address > > > > > > > > > > > > > > > these in KIP and update the thread here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana > > wrote: > > > > > > > > > > > > > > > > Thanks, Harsha for the KIP. It is a good start > > for > > > > > > tiered > > > > > > > > > storage > > > > > > > > > > > > in > > > > > > > > > > > > > > > > Kafka. I have a few comments/questions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It may be good to have a configuration to keep > > the > > > > > > number > > > > > > > > of > > > > > > > > > > > local > > > > > > > > > > > > > > > > segments instead of keeping only the active > > segment. > > > > > > This > > > > > > > > > config > > > > > > > > > > > > can > > > > > > > > > > > > > > > > be exposed at cluster and topic levels with > > default > > > > > > value > > > > > > > > as > > > > > > > > > 1. > > > > > > > > > > > In > > > > > > > > > > > > > > > > some use cases, few consumers may lag over one > > > > > > segment, it > > > > > > > > > will > > > > > > > > > > > be > > > > > > > > > > > > > > > > better to serve from local storage instead of > > remote > > > > > > > > storage. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It may be better to keep > > “remote.log.storage.enable” > > > > > > and > > > > > > > > > > > respective > > > > > > > > > > > > > > > > configuration at topic level along with > > cluster level. > > > > > > It > > > > > > > > > will be > > > > > > > > > > > > > > > > helpful in environments where few topics are > > configured > > > > > > > > with > > > > > > > > > > > > > > > > local-storage and other topics are configured > > with > > > > > > remote > > > > > > > > > > > storage. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Each topic-partition leader pushes its log > > segments > > > > > > with > > > > > > > > > > > respective > > > > > > > > > > > > > > > > index files to remote whenever active log > > rolls over, > > > > > > it > > > > > > > > > updates > > > > > > > > > > > > the > > > > > > > > > > > > > > > > remote log index file for the respective > > remote log > > > > > > > > segment. > > > > > > > > > The > > > > > > > > > > > > > > > > second option is to add offset index files > > also for > > > > > > each > > > > > > > > > segment. > > > > > > > > > > > > It > > > > > > > > > > > > > > > > can serve consumer fetch requests for old > > segments from > > > > > > > > > local log > > > > > > > > > > > > > > > > segment instead of serving directly from the > > remote log > > > > > > > > > which may > > > > > > > > > > > > > > > > cause high latencies. There can be different > > > > > > strategies in > > > > > > > > > when > > > > > > > > > > > the > > > > > > > > > > > > > > > > remote segment is copied to a local segment. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What is “ > > remote.log.manager.scheduler.interval.ms” > > > > > > config > > > > > > > > > about? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > How do followers sync RemoteLogSegmentIndex > > files? Do > > > > > > they > > > > > > > > > > > request > > > > > > > > > > > > > > > > from leader replica? This looks to be > > important as the > > > > > > > > failed > > > > > > > > > > > over > > > > > > > > > > > > > > > > leader should have RemoteLogSegmentIndex > > updated and > > > > > > ready > > > > > > > > to > > > > > > > > > > > avoid > > > > > > > > > > > > > > > > high latencies in serving old data stored in > > remote > > > > > > logs. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Satish. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan < > > > > > > > > > > > > ryannedo...@gmail.com> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Harsha, makes sense. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 5:53 PM Harsha > > Chintalapani < > > > > > > > > > > > > > ka...@harsha.io> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "I think you are saying that this enables > > > > > > additional > > > > > > > > > > > > (potentially > > > > > > > > > > > > > > cheaper) > > > > > > > > > > > > > > > > > > storage options without *requiring* an > > existing ETL > > > > > > > > > > > pipeline. “ > > > > > > > > > > > > > > > > > > Yes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > " But it's not really a replacement for > > the sort of > > > > > > > > > pipelines > > > > > > > > > > > > > > people build > > > > > > > > > > > > > > > > > > with Connect, Gobblin etc.” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It is not. But also making an assumption > > that > > > > > > everyone > > > > > > > > > runs > > > > > > > > > > > > these > > > > > > > > > > > > > > > > > > pipelines for storing raw Kafka data into > > HDFS or > > > > > > S3 is > > > > > > > > > also > > > > > > > > > > > > > wrong > > > > > > > > > > > > > > > > > > assumption. > > > > > > > > > > > > > > > > > > The aim of this KIP is to provide tiered > > storage as > > > > > > > > whole > > > > > > > > > > > > package > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > asking users to ship the data on their own > > using > > > > > > > > existing > > > > > > > > > > > ETL, > > > > > > > > > > > > > > which means > > > > > > > > > > > > > > > > > > running a consumer and maintaining those > > pipelines. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > " My point was that, if you are already > > offloading > > > > > > > > > records in > > > > > > > > > > > > an > > > > > > > > > > > > > > ETL > > > > > > > > > > > > > > > > > > pipeline, why do you need a new pipeline > > built > > > > > > into the > > > > > > > > > > > broker > > > > > > > > > > > > to > > > > > > > > > > > > > > ship the > > > > > > > > > > > > > > > > > > same data to the same place?” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As you said its ETL pipeline, which means > > users of > > > > > > > > these > > > > > > > > > > > > > pipelines > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > reading the data from broker and > > transforming its > > > > > > state > > > > > > > > > and > > > > > > > > > > > > > > storing it > > > > > > > > > > > > > > > > > > somewhere. > > > > > > > > > > > > > > > > > > The point of this KIP is store log > > segments as it > > > > > > is > > > > > > > > > without > > > > > > > > > > > > > > changing > > > > > > > > > > > > > > > > > > their structure so that we can use the > > existing > > > > > > offset > > > > > > > > > > > > mechanisms > > > > > > > > > > > > > > to look > > > > > > > > > > > > > > > > > > it up when the consumer needs to read old > > data. > > > > > > When > > > > > > > > you > > > > > > > > > do > > > > > > > > > > > > load > > > > > > > > > > > > > > it via > > > > > > > > > > > > > > > > > > your existing pipelines you are reading > > the topic > > > > > > as a > > > > > > > > > whole > > > > > > > > > > > , > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > > > doesn’t guarantee that you’ll produce this > > data > > > > > > back > > > > > > > > into > > > > > > > > > > > HDFS > > > > > > > > > > > > in > > > > > > > > > > > > > > S3 in the > > > > > > > > > > > > > > > > > > same order and who is going to generate > > the Index > > > > > > files > > > > > > > > > > > again. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "So you'd end up with one of 1)cold > > segments are > > > > > > only > > > > > > > > > useful > > > > > > > > > > > to > > > > > > > > > > > > > > Kafka; 2) > > > > > > > > > > > > > > > > > > you have the same data written to HDFS/etc > > twice, > > > > > > once > > > > > > > > > for > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > and once > > > > > > > > > > > > > > > > > > for everything else, in two separate > > formats” > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You are talking two different use cases. If > > > > > > someone is > > > > > > > > > > > storing > > > > > > > > > > > > > raw > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > > > out of Kafka for long term access. > > > > > > > > > > > > > > > > > > By storing the data as it is in HDFS > > though Kafka > > > > > > will > > > > > > > > > solve > > > > > > > > > > > > this > > > > > > > > > > > > > > issue. > > > > > > > > > > > > > > > > > > They do not need to run another pipe-line > > to ship > > > > > > these > > > > > > > > > logs. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If they are running pipelines to store in > > HDFS in a > > > > > > > > > different > > > > > > > > > > > > > > format, > > > > > > > > > > > > > > > > > > thats a different use case. May be they are > > > > > > > > transforming > > > > > > > > > > > Kafka > > > > > > > > > > > > > > logs to ORC > > > > > > > > > > > > > > > > > > so that they can query through Hive. Once > > you > > > > > > > > transform > > > > > > > > > the > > > > > > > > > > > > log > > > > > > > > > > > > > > segment it > > > > > > > > > > > > > > > > > > does loose its ability to use the existing > > offset > > > > > > > > index. > > > > > > > > > > > > > > > > > > Main objective here not to change the > > existing > > > > > > protocol > > > > > > > > > and > > > > > > > > > > > > still > > > > > > > > > > > > > > be able > > > > > > > > > > > > > > > > > > to write and read logs from remote storage. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Feb 4, 2019, 2:53 PM -0800, Ryanne > > Dolan < > > > > > > > > > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > >, > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks Harsha, makes sense for the most > > part. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > tiered storage is to get away from > > this and > > > > > > make > > > > > > > > this > > > > > > > > > > > > > > transparent to > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > user > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think you are saying that this enables > > > > > > additional > > > > > > > > > > > > > (potentially > > > > > > > > > > > > > > cheaper) > > > > > > > > > > > > > > > > > > > storage options without *requiring* an > > existing > > > > > > ETL > > > > > > > > > > > pipeline. > > > > > > > > > > > > > > But it's > > > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > really a replacement for the sort of > > pipelines > > > > > > people > > > > > > > > > build > > > > > > > > > > > > > with > > > > > > > > > > > > > > Connect, > > > > > > > > > > > > > > > > > > > Gobblin etc. My point was that, if you > > are > > > > > > already > > > > > > > > > > > offloading > > > > > > > > > > > > > > records in > > > > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > > > > ETL pipeline, why do you need a new > > pipeline > > > > > > built > > > > > > > > > into the > > > > > > > > > > > > > > broker to > > > > > > > > > > > > > > > > > > ship > > > > > > > > > > > > > > > > > > > the same data to the same place? I think > > in most > > > > > > > > cases > > > > > > > > > this > > > > > > > > > > > > > will > > > > > > > > > > > > > > be an > > > > > > > > > > > > > > > > > > > additional pipeline, not a replacement, > > because > > > > > > the > > > > > > > > > > > segments > > > > > > > > > > > > > > written to > > > > > > > > > > > > > > > > > > > cold storage won't be useful outside > > Kafka. So > > > > > > you'd > > > > > > > > > end up > > > > > > > > > > > > > with > > > > > > > > > > > > > > one of > > > > > > > > > > > > > > > > > > 1) > > > > > > > > > > > > > > > > > > > cold segments are only useful to Kafka; > > 2) you > > > > > > have > > > > > > > > the > > > > > > > > > > > same > > > > > > > > > > > > > > data written > > > > > > > > > > > > > > > > > > > to HDFS/etc twice, once for Kafka and > > once for > > > > > > > > > everything > > > > > > > > > > > > else, > > > > > > > > > > > > > > in two > > > > > > > > > > > > > > > > > > > separate formats; 3) you use your > > existing ETL > > > > > > > > > pipeline and > > > > > > > > > > > > > read > > > > > > > > > > > > > > cold > > > > > > > > > > > > > > > > > > data > > > > > > > > > > > > > > > > > > > directly. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To me, an ideal solution would let me > > spool > > > > > > segments > > > > > > > > > from > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > to any > > > > > > > > > > > > > > > > > > sink > > > > > > > > > > > > > > > > > > > I would like, and then let Kafka clients > > > > > > seamlessly > > > > > > > > > access > > > > > > > > > > > > that > > > > > > > > > > > > > > cold > > > > > > > > > > > > > > > > > > data. > > > > > > > > > > > > > > > > > > > Today I can do that in the client, but > > ideally > > > > > > the > > > > > > > > > broker > > > > > > > > > > > > would > > > > > > > > > > > > > > do it for > > > > > > > > > > > > > > > > > > > me via some HDFS/Hive/S3 plugin. The KIP > > seems to > > > > > > > > > > > accomplish > > > > > > > > > > > > > > that -- just > > > > > > > > > > > > > > > > > > > without leveraging anything I've > > currently got in > > > > > > > > > place. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 3:34 PM Harsha < > > > > > > > > ka...@harsha.io > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Eric, > > > > > > > > > > > > > > > > > > > > Thanks for your questions. Answers are > > in-line > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "The high-level design seems to > > indicate that > > > > > > all > > > > > > > > of > > > > > > > > > the > > > > > > > > > > > > > logic > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > when and > > > > > > > > > > > > > > > > > > > > how to copy log segments to remote > > storage > > > > > > lives in > > > > > > > > > the > > > > > > > > > > > RLM > > > > > > > > > > > > > > class. The > > > > > > > > > > > > > > > > > > > > default implementation is then HDFS > > specific > > > > > > with > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > > > > implementations being left to the > > community. > > > > > > This > > > > > > > > > seems > > > > > > > > > > > > like > > > > > > > > > > > > > > it would > > > > > > > > > > > > > > > > > > > > require anyone implementing a new RLM > > to also > > > > > > > > > > > re-implement > > > > > > > > > > > > > the > > > > > > > > > > > > > > logic > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > when to ship data to remote storage." > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RLM will be responsible for shipping > > log > > > > > > segments > > > > > > > > > and it > > > > > > > > > > > > will > > > > > > > > > > > > > > decide > > > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > a log segment is ready to be shipped > > over. > > > > > > > > > > > > > > > > > > > > Once a Log Segement(s) are identified > > as rolled > > > > > > > > > over, RLM > > > > > > > > > > > > > will > > > > > > > > > > > > > > delegate > > > > > > > > > > > > > > > > > > > > this responsibility to a pluggable > > remote > > > > > > storage > > > > > > > > > > > > > > implementation. > > > > > > > > > > > > > > > > > > Users who > > > > > > > > > > > > > > > > > > > > are looking add their own > > implementation to > > > > > > enable > > > > > > > > > other > > > > > > > > > > > > > > storages all > > > > > > > > > > > > > > > > > > they > > > > > > > > > > > > > > > > > > > > need to do is to implement the copy > > and read > > > > > > > > > mechanisms > > > > > > > > > > > and > > > > > > > > > > > > > > not to > > > > > > > > > > > > > > > > > > > > re-implement RLM itself. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "Would it not be better for the Remote > > Log > > > > > > Manager > > > > > > > > > > > > > > implementation to be > > > > > > > > > > > > > > > > > > > > non-configurable, and instead have an > > > > > > interface for > > > > > > > > > the > > > > > > > > > > > > > remote > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > > > > > > layer? That way the "when" of the > > logic is > > > > > > > > consistent > > > > > > > > > > > > across > > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > > > > > implementations and it's only a matter > > of > > > > > > "how," > > > > > > > > > similar > > > > > > > > > > > to > > > > > > > > > > > > > > how the > > > > > > > > > > > > > > > > > > Streams > > > > > > > > > > > > > > > > > > > > StateStores are managed." > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It's possible that we can RLM > > > > > > non-configurable. But > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > > > initial > > > > > > > > > > > > > > > > > > > > release and to keep the backward > > compatibility > > > > > > > > > > > > > > > > > > > > we want to make this configurable and > > for any > > > > > > users > > > > > > > > > who > > > > > > > > > > > > might > > > > > > > > > > > > > > not be > > > > > > > > > > > > > > > > > > > > interested in having the LogSegments > > shipped to > > > > > > > > > remote, > > > > > > > > > > > > they > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > > > need to > > > > > > > > > > > > > > > > > > > > worry about this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > > > > > > > > > > > > > > > Thanks for your questions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "How could this be used to leverage > > fast > > > > > > key-value > > > > > > > > > > > stores, > > > > > > > > > > > > > e.g. > > > > > > > > > > > > > > > > > > Couchbase, > > > > > > > > > > > > > > > > > > > > which can serve individual records but > > maybe > > > > > > not > > > > > > > > > entire > > > > > > > > > > > > > > segments? Or > > > > > > > > > > > > > > > > > > is the > > > > > > > > > > > > > > > > > > > > idea to only support writing and > > fetching > > > > > > entire > > > > > > > > > > > segments? > > > > > > > > > > > > > > Would it > > > > > > > > > > > > > > > > > > make > > > > > > > > > > > > > > > > > > > > sense to support both?" > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > LogSegment once its rolled over are > > immutable > > > > > > > > > objects and > > > > > > > > > > > > we > > > > > > > > > > > > > > want to > > > > > > > > > > > > > > > > > > keep > > > > > > > > > > > > > > > > > > > > the current structure of LogSegments > > and > > > > > > > > > corresponding > > > > > > > > > > > > Index > > > > > > > > > > > > > > files. It > > > > > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > > > > > be easy to copy the whole segment as > > it is, > > > > > > instead > > > > > > > > > of > > > > > > > > > > > > > > re-reading each > > > > > > > > > > > > > > > > > > file > > > > > > > > > > > > > > > > > > > > and use a key/value store. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > " > > > > > > > > > > > > > > > > > > > > - Instead of defining a new interface > > and/or > > > > > > > > > mechanism to > > > > > > > > > > > > ETL > > > > > > > > > > > > > > segment > > > > > > > > > > > > > > > > > > files > > > > > > > > > > > > > > > > > > > > from brokers to cold storage, can we > > just > > > > > > leverage > > > > > > > > > Kafka > > > > > > > > > > > > > > itself? In > > > > > > > > > > > > > > > > > > > > particular, we can already ETL records > > to HDFS > > > > > > via > > > > > > > > > Kafka > > > > > > > > > > > > > > Connect, > > > > > > > > > > > > > > > > > > Gobblin > > > > > > > > > > > > > > > > > > > > etc -- we really just need a way for > > brokers to > > > > > > > > read > > > > > > > > > > > these > > > > > > > > > > > > > > records > > > > > > > > > > > > > > > > > > back. > > > > > > > > > > > > > > > > > > > > I'm wondering whether the new API > > could be > > > > > > limited > > > > > > > > > to the > > > > > > > > > > > > > > fetch, and > > > > > > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > > > > > existing ETL pipelines could be more > > easily > > > > > > > > > leveraged. > > > > > > > > > > > For > > > > > > > > > > > > > > example, if > > > > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > > > > already have an ETL pipeline from > > Kafka to > > > > > > HDFS, > > > > > > > > you > > > > > > > > > > > could > > > > > > > > > > > > > > leave that > > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > > place and just tell Kafka how to read > > these > > > > > > > > > > > > records/segments > > > > > > > > > > > > > > from cold > > > > > > > > > > > > > > > > > > > > storage when necessary." > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is pretty much what everyone does > > and it > > > > > > has > > > > > > > > the > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > > overhead > > > > > > > > > > > > > > > > > > > > of keeping these pipelines operating > > and > > > > > > > > monitoring. > > > > > > > > > > > > > > > > > > > > What's proposed in the KIP is not ETL. > > It's > > > > > > just > > > > > > > > > looking > > > > > > > > > > > a > > > > > > > > > > > > > the > > > > > > > > > > > > > > logs > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > are written and rolled over to copy > > the file > > > > > > as it > > > > > > > > > is. > > > > > > > > > > > > > > > > > > > > Each new topic needs to be added (sure > > we can > > > > > > do so > > > > > > > > > via > > > > > > > > > > > > > > wildcard or > > > > > > > > > > > > > > > > > > > > another mechanism) but new topics need > > to be > > > > > > > > onboard > > > > > > > > > to > > > > > > > > > > > > ship > > > > > > > > > > > > > > the data > > > > > > > > > > > > > > > > > > into > > > > > > > > > > > > > > > > > > > > remote storage through a traditional > > ETL > > > > > > pipeline. > > > > > > > > > > > > > > > > > > > > Once the data lands somewhere like > > HDFS/HIVE > > > > > > etc.. > > > > > > > > > Users > > > > > > > > > > > > need > > > > > > > > > > > > > > to write > > > > > > > > > > > > > > > > > > > > another processing line to re-process > > this data > > > > > > > > > similar > > > > > > > > > > > to > > > > > > > > > > > > > how > > > > > > > > > > > > > > they are > > > > > > > > > > > > > > > > > > > > doing it in their Stream processing > > pipelines. > > > > > > > > Tiered > > > > > > > > > > > > storage > > > > > > > > > > > > > > is to get > > > > > > > > > > > > > > > > > > > > away from this and make this > > transparent to the > > > > > > > > user. > > > > > > > > > > > They > > > > > > > > > > > > > > don't need > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > run another ETL process to ship the > > logs. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > "I'm wondering if we could just add > > support for > > > > > > > > > loading > > > > > > > > > > > > > > segments from > > > > > > > > > > > > > > > > > > > > remote URIs instead of from file, i.e. > > via > > > > > > plugins > > > > > > > > > for > > > > > > > > > > > > s3://, > > > > > > > > > > > > > > hdfs:// > > > > > > > > > > > > > > > > > > etc. > > > > > > > > > > > > > > > > > > > > I suspect less broker logic would > > change in > > > > > > that > > > > > > > > > case -- > > > > > > > > > > > > the > > > > > > > > > > > > > > broker > > > > > > > > > > > > > > > > > > > > wouldn't necessarily care if it reads > > from > > > > > > file:// > > > > > > > > or > > > > > > > > > > > s3:// > > > > > > > > > > > > > to > > > > > > > > > > > > > > load a > > > > > > > > > > > > > > > > > > given > > > > > > > > > > > > > > > > > > > > segment." > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, this is what we are discussing in > > KIP. We > > > > > > are > > > > > > > > > > > leaving > > > > > > > > > > > > > the > > > > > > > > > > > > > > details > > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > > loading segments to RLM read part > > instead of > > > > > > > > directly > > > > > > > > > > > > > exposing > > > > > > > > > > > > > > this in > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > Broker. This way we can keep the > > current Kafka > > > > > > code > > > > > > > > > as it > > > > > > > > > > > > is > > > > > > > > > > > > > > without > > > > > > > > > > > > > > > > > > > > changing the assumptions around the > > local > > > > > > disk. Let > > > > > > > > > the > > > > > > > > > > > RLM > > > > > > > > > > > > > > handle the > > > > > > > > > > > > > > > > > > > > remote storage part. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019, at 12:54 PM, > > Ryanne Dolan > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Harsha, Sriharsha, Suresh, a couple > > thoughts: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > - How could this be used to leverage > > fast > > > > > > > > key-value > > > > > > > > > > > > stores, > > > > > > > > > > > > > > e.g. > > > > > > > > > > > > > > > > > > > > Couchbase, > > > > > > > > > > > > > > > > > > > >