>>Depends on the implementation, the data of one segment may not necessary be stored in a single file. There could be a maximum object / chunk / file size restriction on the remote storage. So, one Kafka segment could be saved in multiple chunks in remote storage.
>Having one local segment can be stored in multiple files and each file can have a base position as part of the metadata(like name) of file or object etc. File/object name can be <startOffset>-<endOffset>-<basePosition>. So any read request for a position with in that segment can be found by computing relative position viz `fetchPosition-basePosition`. Let me elaborate further on how to address a single local segment file being copied to multiple files/blocks in remote storage without the need to map local segment positions to remote segment positions. Let us say a local segment file has offsets from 1000-95000. This may be copied to remote storage in multiple files/blocks. Each file or block can be created with name or any other metadata containing <start-offset>-<end-offset>-<base-position>. This does not require recomputing positions for the remote segments. local segment file has offsets: 1000 - 95000 remote segment file suffix format can be : <start-offset>-<end-offset>-<base-position> remote-segment-file-1: 1000-20200-0 remote-segment-file-2: 20201-45003-942346 remote-segment-file-3: 45004-78008-6001235 remote-segment-file-4: 78009-95000-20024761 If a read comes for 52340 offset and position as 7321236, relative position in remote segment-3 is: 7321236-6001235 = 1320001 Thanks, Satish. On Thu, Nov 7, 2019 at 7:55 AM Satish Duggana <satish.dugg...@gmail.com> wrote: > > >Depends on the implementation, the data of one segment may not necessary be > stored in a single file. > There could be a maximum object / chunk / file size restriction on the > remote storage. So, one Kafka > segment could be saved in multiple chunks in remote storage. > > Having one local segment can be stored in multiple files and each file > can have a base position as part of the metadata(like name) of file or > object etc. > File/object name can be <startOffset>-<endOffset>-<basePosition>. So > any read request for a position with in that segment can be found by > computing relative position viz `fetchPosition-basePosition`. > > > > On Thu, Nov 7, 2019 at 6:04 AM Ying Zheng <yi...@uber.com.invalid> wrote: > > > > 21. I am not sure that I understood the need for RemoteLogIndexEntry and > > its relationship with RemoteLogSegmentInfo. It seems > > that RemoteLogIndexEntry are offset index entries pointing to record > > batches inside a segment. That seems to be the same as the .index file? > > > > We do not assume the how the data is stored in the remote storage. > > Depends on the implementation, the data of one segment may not necessary be > > stored in a single file. > > There could be a maximum object / chunk / file size restriction on the > > remote storage. So, one Kafka > > segment could be saved in multiple chunks in remote storage. > > > > The remote log index also have a larger index interval. The default > > interval of the local .index file > > (log.index.interval.bytes) is 4KB. In the current HDFS RSM implementation, > > the default remote > > index interval (hdfs.remote.index.interval.bytes) is 256KB. The > > coarse-grained remote index saves > > some local disk space. The smaller size also makes it more likely to be > > cached in physical memory. > > > > > > > > > > On Thu, Oct 31, 2019 at 1:58 PM Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Harsha, > > > > > > I am still looking at the KIP and the PR. A couple of quick > > > comments/questions. > > > > > > 20. It's fine to keep the HDFS binding temporarily in the PR. We just need > > > to remove it before it's merged to trunk. As Victor mentioned, we can > > > provide a reference implementation based on a mocked version of remote > > > storage. > > > > > > 21. I am not sure that I understood the need for RemoteLogIndexEntry and > > > its relationship with RemoteLogSegmentInfo. It seems > > > that RemoteLogIndexEntry are offset index entries pointing to record > > > batches inside a segment. That seems to be the same as the .index file? > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Oct 28, 2019 at 9:11 PM Satish Duggana <satish.dugg...@gmail.com> > > > wrote: > > > > > > > Hi Viktor, > > > > >1. Can we allow RLM Followers to serve read requests? After all > > > > >segments > > > > on > > > > the cold storage are closed ones, no modification is allowed. Besides > > > > KIP-392 ( > > > > > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D392-253A-2BAllow-2Bconsumers-2Bto-2Bfetch-2Bfrom-2Bclosest-2Breplica&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=HTPACirRO-wVmOHmGEMlTIAov4szGHn38xrbFbMZK_I&e= > > > > ) > > > > would introduce follower fetching too, so I think it would be nice to > > > > prepare RLM for this as well. > > > > > > > > That is a good point. We plan to support fetching remote storage from > > > > followers too. Current code in the PR work fine for this scenario > > > > though there may be some edge cases to be handled. We have not yet > > > > tested this scenario. > > > > > > > > >2. I think the remote.log.storage.enable config is redundant. By > > > > specifying > > > > > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__remote.log.storage.manager.class.name&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=QsUunkBFX3dne_4caCiEAbp9xKUPrFx1srwznOR_Sfc&e= > > > one already declares that they want > > > > to use remote storage. Would it make sense to remove > > > > the remote.log.storage.enable config? > > > > > > > > I do not think it is really needed. `remote.log.storage.enable` > > > > property can be removed. > > > > > > > > Thanks, > > > > Satish. > > > > > > > > > > > > On Thu, Oct 24, 2019 at 2:46 PM Viktor Somogyi-Vass > > > > <viktorsomo...@gmail.com> wrote: > > > > > > > > > > Hi Harsha, > > > > > > > > > > A couple more questions: > > > > > 1. Can we allow RLM Followers to serve read requests? After all > > > segments > > > > on > > > > > the cold storage are closed ones, no modification is allowed. Besides > > > > > KIP-392 ( > > > > > > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D392-253A-2BAllow-2Bconsumers-2Bto-2Bfetch-2Bfrom-2Bclosest-2Breplica&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=HTPACirRO-wVmOHmGEMlTIAov4szGHn38xrbFbMZK_I&e= > > > > ) > > > > > would introduce follower fetching too, so I think it would be nice to > > > > > prepare RLM for this as well. > > > > > 2. I think the remote.log.storage.enable config is redundant. By > > > > specifying > > > > > > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__remote.log.storage.manager.class.name&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=QsUunkBFX3dne_4caCiEAbp9xKUPrFx1srwznOR_Sfc&e= > > > one already declares that they > > > > want > > > > > to use remote storage. Would it make sense to remove > > > > > the remote.log.storage.enable config? > > > > > > > > > > Thanks, > > > > > Viktor > > > > > > > > > > > > > > > On Thu, Oct 24, 2019 at 10:37 AM Viktor Somogyi-Vass < > > > > > viktorsomo...@gmail.com> wrote: > > > > > > > > > > > Hi Jun & Harsha, > > > > > > > > > > > > I think it would be beneficial to at least provide one simple > > > reference > > > > > > implementation (file system based?) as we do with connect too. > > > > > > That would as a simple example and would help plugin developers to > > > > better > > > > > > understand the concept and the interfaces. > > > > > > > > > > > > Best, > > > > > > Viktor > > > > > > > > > > > > On Wed, Oct 23, 2019 at 8:49 PM Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > >> Hi, Harsha, > > > > > >> > > > > > >> Regarding feature branch, if the goal is faster collaboration, it > > > > seems > > > > > >> that doing the development on your own fork is better since > > > > non-committers > > > > > >> can push changes there. > > > > > >> > > > > > >> Regarding the dependencies, this is an important thing to clarify. > > > My > > > > > >> understanding for this KIP is that in Apache Kafka, we won't > > > > > >> provide > > > > any > > > > > >> specific implementation for a particular block storage. There are > > > many > > > > > >> block storage systems out there (HDFS, S3, Google storage, Azure > > > > storage, > > > > > >> Ceph, etc). We don't want to drag in all those dependencies in > > > Apache > > > > > >> Kafka, even if they are in a separate module. Doing that will make > > > the > > > > > >> Kafka repo much harder to manage. We have used the same approach > > > > > >> for > > > > > >> connect. The connect framework is in Apache Kafka, but all specific > > > > > >> connectors are hosted externally. > > > > > >> > > > > > >> Thanks, > > > > > >> > > > > > >> Jun > > > > > >> > > > > > >> > > > > > >> > > > > > >> On Wed, Oct 23, 2019 at 8:41 AM Eno Thereska < > > > eno.there...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >> > Thanks Satish, Harsha, > > > > > >> > > > > > > >> > It's probably worth it making it clearer in the KIP what exact > > > > > >> > libraries will be added to libs, if any. The KIP specifies the > > > > remote > > > > > >> > storage interface but it isn't clear if particular > > > > > >> > implementations > > > > > >> > will be added to Kafka's repository or whether they will reside > > > > > >> > in > > > > > >> > other repositories. If I understand the intention correctly, you > > > are > > > > > >> > proposing to have an HDFS and S3 implementation as part of the > > > Kafka > > > > > >> > repository working out of the box. Is that correct? > > > > > >> > > > > > > >> > Thanks > > > > > >> > Eno > > > > > >> > > > > > > >> > On Wed, Oct 23, 2019 at 5:01 AM Satish Duggana < > > > > > >> satish.dugg...@gmail.com> > > > > > >> > wrote: > > > > > >> > > > > > > > >> > > >Regarding the HDFS dependency its not a direct dependency > > > rather > > > > > >> > > its implementing the RemoteStorageManager interface. > > > > > >> > > We packaged it along with core to make it more convenient to > > > test > > > > it. > > > > > >> We > > > > > >> > > can move this to external module and keep it there. > > > > > >> > > Let me know what you think. > > > > > >> > > > > > > > >> > > Let me elaborate more on this point. With the new changes in > > > > > >> > > the > > > > PR, > > > > > >> > > kafka core or any other existing module is not dependent on > > > HDFS. > > > > We > > > > > >> > > created a new module called `remote-storage-managers/hdfs`. > > > > Libraries > > > > > >> > > generated by this module are added to libs while packaging the > > > > > >> > > distribution. This makes easy for users to try HDFS tiered > > > storage > > > > > >> > > instead of users building hdfs module and add it to libs on > > > their > > > > own. > > > > > >> > > We have plans to push these libs into external/libs/ directory > > > and > > > > > >> > > they will not be added to the classpath by default. We can add > > > > them to > > > > > >> > > the classpath in scripts based on a system property. > > > > > >> > > > > > > > >> > > On Wed, Oct 23, 2019 at 6:26 AM Harsha Chintalapani < > > > > ka...@harsha.io> > > > > > >> > wrote: > > > > > >> > > > > > > > > >> > > > Hi Jun, > > > > > >> > > > Thanks for the feedback. Given the no.of engineers > > > > > >> involved > > > > > >> > in > > > > > >> > > > cross-team effort > > > > > >> > > > it would be great to have this as feature branch. > > > > > >> > > > Irrespective > > > > of if > > > > > >> > its in > > > > > >> > > > my fork > > > > > >> > > > or in Apache Kafka's branch it needs to be constantly rebased > > > > from > > > > > >> > trunk to > > > > > >> > > > keep it current. > > > > > >> > > > Our proposal is to merge it in feature branch and open a PR > > > > > >> > > > so > > > > its > > > > > >> no > > > > > >> > > > different than current PR except that > > > > > >> > > > its in central repo rather my fork. Having it in Kafka's > > > branch > > > > > >> > > > makes it easier for everyone to collaborate on this important > > > > > >> feature > > > > > >> > in > > > > > >> > > > kafka. Let me know if you still think otherwise. > > > > > >> > > > KIP is updated and we can go through the discussion. > > > > > >> > > > Regarding the HDFS dependency its not a direct > > > > dependency > > > > > >> > rather > > > > > >> > > > its implementing the RemoteStorageManager interface. > > > > > >> > > > We packaged it along with core to make it more convenient to > > > > test > > > > > >> it. > > > > > >> > We > > > > > >> > > > can move this to external module and keep it there. > > > > > >> > > > Let me know what you think. > > > > > >> > > > > > > > > >> > > > Thanks, > > > > > >> > > > Harsha > > > > > >> > > > > > > > > >> > > > On Tue, Oct 22, 2019 at 3:53 PM Jun Rao <j...@confluent.io> > > > > wrote: > > > > > >> > > > > > > > > >> > > > > Hi, Harsha, > > > > > >> > > > > > > > > > >> > > > > Historically, we tried using a feature branch in 0.8. The > > > > > >> experience > > > > > >> > > > > actually wasn't great. Merging the feature branch to the > > > main > > > > > >> branch > > > > > >> > > > > required additional review work and each merge with the > > > > > >> > > > > main > > > > > >> branch > > > > > >> > added > > > > > >> > > > > the risk of introducing new bugs. So, we have been avoiding > > > > > >> feature > > > > > >> > > > > branches since then, even for some major features. > > > > > >> > > > > > > > > > >> > > > > It's also going to be weird to have a feature branch before > > > a > > > > KIP > > > > > >> is > > > > > >> > > > > accepted. > > > > > >> > > > > > > > > > >> > > > > The KIP hasn't been updated much since the initial reviews. > > > > Is it > > > > > >> > ready for > > > > > >> > > > > discussion again? > > > > > >> > > > > > > > > > >> > > > > Looking at the PR, it seems to have direct dependency on > > > > HDFS. My > > > > > >> > > > > understanding is that the goal of the KIP is to make it > > > > > >> > > > > more > > > > > >> general > > > > > >> > such > > > > > >> > > > > that it can bind to different types of block storage. If > > > > > >> > > > > so, > > > > we > > > > > >> > should > > > > > >> > > > > avoid introducing a direct dependency to any specific block > > > > > >> storage > > > > > >> > in > > > > > >> > > > > Apache Kafka. > > > > > >> > > > > > > > > > >> > > > > Thanks, > > > > > >> > > > > > > > > > >> > > > > Jun > > > > > >> > > > > > > > > > >> > > > > On Mon, Oct 21, 2019 at 8:46 AM Harsha <ka...@harsha.io> > > > > wrote: > > > > > >> > > > > > > > > > >> > > > > > Hi All, > > > > > >> > > > > > Thanks for the initial feedback on the KIP-405. > > > > We > > > > > >> > opened a PR > > > > > >> > > > > > here > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_kafka_pull_7561&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=W2_QM1iSNFp_YUFXt78ZFdtqHJp6P71-rRM8RN5yjuM&e= > > > . > > > > > >> > > > > > Please take a look and let us know if you have any > > > > questions. > > > > > >> > > > > > Since this feature is being developed by engineers from > > > > > >> different > > > > > >> > > > > > companies we would like to open a feature branch in > > > > > >> > > > > > apache > > > > kafka > > > > > >> > git. It > > > > > >> > > > > > will allow us collaborate in open source community rather > > > > than > > > > > >> in > > > > > >> > private > > > > > >> > > > > > branches. Please let me know if you have any objections > > > > > >> > > > > > to > > > > > >> opening > > > > > >> > a > > > > > >> > > > > > feature branch in kafka's git repo. > > > > > >> > > > > > > > > > > >> > > > > > Thanks, > > > > > >> > > > > > Harsha > > > > > >> > > > > > > > > > > >> > > > > > On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote: > > > > > >> > > > > > > Thanks, Ron. Updating the KIP. will add answers here as > > > > well > > > > > >> > > > > > > > > > > > >> > > > > > > 1) If the cold storage technology can be cross-region, > > > is > > > > > >> there > > > > > >> > a > > > > > >> > > > > > > possibility for a disaster recovery Kafka cluster to > > > > share > > > > > >> the > > > > > >> > > > > messages > > > > > >> > > > > > in > > > > > >> > > > > > > cold storage? My guess is the answer is no, and > > > messages > > > > > >> > replicated > > > > > >> > > > > to > > > > > >> > > > > > the > > > > > >> > > > > > > D/R cluster have to be migrated to cold storage from > > > > there > > > > > >> > > > > > independently. > > > > > >> > > > > > > (The same cross-region cold storage medium could be > > > > used, but > > > > > >> > every > > > > > >> > > > > > message > > > > > >> > > > > > > would appear there twice). > > > > > >> > > > > > > > > > > > >> > > > > > > If I understand the question correctly, what you are > > > > saying is > > > > > >> > Kafka A > > > > > >> > > > > > > cluster (active) shipping logs to remote storage which > > > > > >> > cross-region > > > > > >> > > > > > > replication and another Kafka Cluster B (Passive) will > > > it > > > > be > > > > > >> > able to > > > > > >> > > > > > > use the remote storage copied logs directly. > > > > > >> > > > > > > For the initial version my answer is No. We can handle > > > > this in > > > > > >> > > > > > > subsequent changes after this one. > > > > > >> > > > > > > > > > > > >> > > > > > > 2) Can/should external (non-Kafka) tools have direct > > > > access > > > > > >> to > > > > > >> > the > > > > > >> > > > > > messages > > > > > >> > > > > > > in cold storage. I think this might have been > > > addressed > > > > when > > > > > >> > someone > > > > > >> > > > > > asked > > > > > >> > > > > > > about ACLs, and I believe the answer is "no" -- if > > > > > >> > > > > > > some > > > > > >> > external tool > > > > > >> > > > > > needs > > > > > >> > > > > > > to operate on that data then that external tool should > > > > read > > > > > >> > that data > > > > > >> > > > > by > > > > > >> > > > > > > acting as a Kafka consumer. Again, just asking to get > > > the > > > > > >> answer > > > > > >> > > > > clearly > > > > > >> > > > > > > documented in case it is unclear. > > > > > >> > > > > > > > > > > > >> > > > > > > The answer is No. All tools/clients must go through > > > broker > > > > > >> APIs > > > > > >> > to > > > > > >> > > > > > > access any data (local or remote). > > > > > >> > > > > > > Only Kafka broker user will have access to remote > > > storage > > > > logs > > > > > >> > and > > > > > >> > > > > > > Security/ACLs will work the way it does today. > > > > > >> > > > > > > Tools/Clients going directly to the remote storage > > > > > >> > > > > > > might > > > > help > > > > > >> in > > > > > >> > terms > > > > > >> > > > > > > of efficiency but this requires Protocol changes and > > > some > > > > way > > > > > >> of > > > > > >> > > > > > > syncing ACLs in Kafka to the Remote storage. > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > Thanks, > > > > > >> > > > > > > Harsha > > > > > >> > > > > > > > > > > > >> > > > > > > On Mon, Apr 8, 2019, at 8:48 AM, Ron Dagostino wrote: > > > > > >> > > > > > > > Hi Harsha. A couple of questions. I think I know > > > > > >> > > > > > > > the > > > > > >> > answers, but > > > > > >> > > > > it > > > > > >> > > > > > > > would be good to see them explicitly documented. > > > > > >> > > > > > > > > > > > > >> > > > > > > > 1) If the cold storage technology can be > > > > > >> > > > > > > > cross-region, > > > > is > > > > > >> > there a > > > > > >> > > > > > > > possibility for a disaster recovery Kafka cluster to > > > > share > > > > > >> the > > > > > >> > > > > > messages in > > > > > >> > > > > > > > cold storage? My guess is the answer is no, and > > > > messages > > > > > >> > replicated > > > > > >> > > > > > to the > > > > > >> > > > > > > > D/R cluster have to be migrated to cold storage from > > > > there > > > > > >> > > > > > independently. > > > > > >> > > > > > > > (The same cross-region cold storage medium could be > > > > used, > > > > > >> but > > > > > >> > every > > > > > >> > > > > > message > > > > > >> > > > > > > > would appear there twice). > > > > > >> > > > > > > > > > > > > >> > > > > > > > 2) Can/should external (non-Kafka) tools have direct > > > > access > > > > > >> to > > > > > >> > the > > > > > >> > > > > > messages > > > > > >> > > > > > > > in cold storage. I think this might have been > > > addressed > > > > > >> when > > > > > >> > someone > > > > > >> > > > > > asked > > > > > >> > > > > > > > about ACLs, and I believe the answer is "no" -- if > > > some > > > > > >> > external tool > > > > > >> > > > > > needs > > > > > >> > > > > > > > to operate on that data then that external tool > > > > > >> > > > > > > > should > > > > read > > > > > >> > that data > > > > > >> > > > > > by > > > > > >> > > > > > > > acting as a Kafka consumer. Again, just asking to > > > > > >> > > > > > > > get > > > > the > > > > > >> > answer > > > > > >> > > > > > clearly > > > > > >> > > > > > > > documented in case it is unclear. > > > > > >> > > > > > > > > > > > > >> > > > > > > > Ron > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > On Thu, Apr 4, 2019 at 12:53 AM Harsha < > > > ka...@harsha.io > > > > > > > > > > >> > wrote: > > > > > >> > > > > > > > > > > > > >> > > > > > > > > Hi Viktor, > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > "Now, will the consumer be able to consume a remote > > > > > >> segment > > > > > >> > if: > > > > > >> > > > > > > > > - the remote segment is stored in the remote > > > storage, > > > > BUT > > > > > >> > > > > > > > > - the leader broker failed right after this AND > > > > > >> > > > > > > > > - the follower which is to become a leader didn't > > > > scan yet > > > > > >> > for a > > > > > >> > > > > new > > > > > >> > > > > > > > > segment?" > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > If I understand correctly, after a local log > > > > > >> > > > > > > > > segment > > > > > >> copied > > > > > >> > to > > > > > >> > > > > > remote and > > > > > >> > > > > > > > > leader is failed to write the index files and > > > > leadership > > > > > >> > changed > > > > > >> > > > > to a > > > > > >> > > > > > > > > follower. In this case we consider the log segment > > > > copy > > > > > >> > failed and > > > > > >> > > > > > newly > > > > > >> > > > > > > > > elected leader will start copying the data from > > > > > >> > > > > > > > > last > > > > the > > > > > >> > known > > > > > >> > > > > > offset in > > > > > >> > > > > > > > > the remote to copy. Consumers who are looking for > > > the > > > > > >> > offset which > > > > > >> > > > > > might > > > > > >> > > > > > > > > be in the failed copy log segment will continue to > > > be > > > > read > > > > > >> > the data > > > > > >> > > > > > from > > > > > >> > > > > > > > > local disk since the local log segment will only be > > > > > >> deleted > > > > > >> > once a > > > > > >> > > > > > > > > successful copy of the log segment. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > "As a follow-up question, what are your > > > > > >> > > > > > > > > experiences, > > > > does > > > > > >> a > > > > > >> > > > > failover > > > > > >> > > > > > in a > > > > > >> > > > > > > > > broker causes bigger than usual churn in the > > > > consumers? > > > > > >> (I'm > > > > > >> > > > > > thinking about > > > > > >> > > > > > > > > the time required to rebuild remote index files.)" > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Rebuild remote index files will only happen in case > > > of > > > > > >> > remote > > > > > >> > > > > > storage > > > > > >> > > > > > > > > missing all the copied index files. Fail-over will > > > > not > > > > > >> > trigger > > > > > >> > > > > this > > > > > >> > > > > > > > > rebuild. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Hi Ryan, > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > "Harsha, can you comment on this alternative > > > approach: > > > > > >> > instead of > > > > > >> > > > > > fetching > > > > > >> > > > > > > > > directly from remote storage via a new API, > > > implement > > > > > >> > something > > > > > >> > > > > like > > > > > >> > > > > > > > > paging, where segments are paged-in and out of cold > > > > > >> storage > > > > > >> > based > > > > > >> > > > > on > > > > > >> > > > > > access > > > > > >> > > > > > > > > frequency/recency? For example, when a remote > > > segment > > > > is > > > > > >> > accessed, > > > > > >> > > > > > it could > > > > > >> > > > > > > > > be first fetched to disk and then read from there. > > > > > >> > > > > > > > > I > > > > > >> suppose > > > > > >> > this > > > > > >> > > > > > would > > > > > >> > > > > > > > > require less code changes, or at least less API > > > > changes." > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Copying whole log segment from remote is > > > inefficient. > > > > When > > > > > >> > tiered > > > > > >> > > > > > storage > > > > > >> > > > > > > > > is enabled users might prefer hardware with smaller > > > > disks > > > > > >> and > > > > > >> > > > > having > > > > > >> > > > > > to > > > > > >> > > > > > > > > copy the log segment to local disk again , > > > especially > > > > > >> incase > > > > > >> > of > > > > > >> > > > > > multiple > > > > > >> > > > > > > > > consumers on multiple topics triggering this might > > > > > >> negatively > > > > > >> > > > > affect > > > > > >> > > > > > the > > > > > >> > > > > > > > > available local storage. > > > > > >> > > > > > > > > What we proposed in the KIP doesn't affect the > > > > existing > > > > > >> APIs > > > > > >> > and we > > > > > >> > > > > > didn't > > > > > >> > > > > > > > > call for any API changes. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > "And related to paging, does the proposal address > > > what > > > > > >> > happens > > > > > >> > > > > when a > > > > > >> > > > > > > > > broker > > > > > >> > > > > > > > > runs out of HDD space? Maybe we should have a way > > > > > >> > > > > > > > > to > > > > > >> > configure a > > > > > >> > > > > max > > > > > >> > > > > > number > > > > > >> > > > > > > > > of segments or bytes stored on each broker, after > > > > which > > > > > >> > older or > > > > > >> > > > > > > > > least-recently-used segments are kicked out, even > > > > > >> > > > > > > > > if > > > > they > > > > > >> > aren't > > > > > >> > > > > > expired > > > > > >> > > > > > > > > per the retention policy? Otherwise, I suppose > > > tiered > > > > > >> storage > > > > > >> > > > > > requires some > > > > > >> > > > > > > > > babysitting to ensure that brokers don't run out of > > > > local > > > > > >> > storage, > > > > > >> > > > > > despite > > > > > >> > > > > > > > > having access to potentially unbounded cold > > > storage." > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Existing Kafka behavior will not change with > > > addition > > > > of > > > > > >> > tiered > > > > > >> > > > > > storage > > > > > >> > > > > > > > > and enabling it also will not change behavior. > > > > > >> > > > > > > > > Just like today it's up to the operator to make > > > > > >> > > > > > > > > sure > > > > the > > > > > >> HD > > > > > >> > space > > > > > >> > > > > is > > > > > >> > > > > > > > > monitored and take necessary actions to mitigate > > > that > > > > > >> before > > > > > >> > it > > > > > >> > > > > > becomes > > > > > >> > > > > > > > > fatal failure for broker. We don't stop users to > > > > configure > > > > > >> > the > > > > > >> > > > > > retention > > > > > >> > > > > > > > > period to infinite and they can easily run out of > > > the > > > > > >> space. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > These are not the alternatives considered as they > > > are > > > > not > > > > > >> > efficient > > > > > >> > > > > > copy > > > > > >> > > > > > > > > in out of local disk , hence the reason we didn't > > > add > > > > to > > > > > >> > > > > alternatives > > > > > >> > > > > > > > > considered :). > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Thanks, > > > > > >> > > > > > > > > Harsha > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > On Wed, Apr 3, 2019, at 7:51 AM, Ryanne Dolan > > > > > >> > > > > > > > > wrote: > > > > > >> > > > > > > > > > Harsha, can you comment on this alternative > > > > approach: > > > > > >> > instead of > > > > > >> > > > > > fetching > > > > > >> > > > > > > > > > directly from remote storage via a new API, > > > > implement > > > > > >> > something > > > > > >> > > > > > like > > > > > >> > > > > > > > > > paging, where segments are paged-in and out of > > > cold > > > > > >> > storage based > > > > > >> > > > > > on > > > > > >> > > > > > > > > access > > > > > >> > > > > > > > > > frequency/recency? For example, when a remote > > > > segment is > > > > > >> > > > > accessed, > > > > > >> > > > > > it > > > > > >> > > > > > > > > could > > > > > >> > > > > > > > > > be first fetched to disk and then read from > > > there. I > > > > > >> > suppose this > > > > > >> > > > > > would > > > > > >> > > > > > > > > > require less code changes, or at least less API > > > > changes. > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > And related to paging, does the proposal address > > > > what > > > > > >> > happens > > > > > >> > > > > when > > > > > >> > > > > > a > > > > > >> > > > > > > > > broker > > > > > >> > > > > > > > > > runs out of HDD space? Maybe we should have a way > > > to > > > > > >> > configure a > > > > > >> > > > > > max > > > > > >> > > > > > > > > number > > > > > >> > > > > > > > > > of segments or bytes stored on each broker, after > > > > which > > > > > >> > older or > > > > > >> > > > > > > > > > least-recently-used segments are kicked out, even > > > if > > > > > >> they > > > > > >> > aren't > > > > > >> > > > > > expired > > > > > >> > > > > > > > > > per the retention policy? Otherwise, I suppose > > > > tiered > > > > > >> > storage > > > > > >> > > > > > requires > > > > > >> > > > > > > > > some > > > > > >> > > > > > > > > > babysitting to ensure that brokers don't run out > > > of > > > > > >> local > > > > > >> > > > > storage, > > > > > >> > > > > > > > > despite > > > > > >> > > > > > > > > > having access to potentially unbounded cold > > > storage. > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Just some things to add to Alternatives > > > > > >> > > > > > > > > > Considered > > > > :) > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Ryanne > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > On Wed, Apr 3, 2019 at 8:21 AM Viktor > > > Somogyi-Vass < > > > > > >> > > > > > > > > viktorsomo...@gmail.com> > > > > > >> > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > Hi Harsha, > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks for the answer, makes sense. > > > > > >> > > > > > > > > > > In the meantime one edge case popped up in my > > > > mind but > > > > > >> > first > > > > > >> > > > > let > > > > > >> > > > > > me > > > > > >> > > > > > > > > > > summarize what I understand if I interpret your > > > > KIP > > > > > >> > correctly. > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > So basically whenever the leader RSM copies > > > over a > > > > > >> > segment to > > > > > >> > > > > the > > > > > >> > > > > > > > > remote > > > > > >> > > > > > > > > > > storage, the leader RLM will append an entry to > > > > its > > > > > >> > remote > > > > > >> > > > > index > > > > > >> > > > > > files > > > > > >> > > > > > > > > with > > > > > >> > > > > > > > > > > the remote position. After this LogManager can > > > > delete > > > > > >> > the local > > > > > >> > > > > > > > > segment. > > > > > >> > > > > > > > > > > Parallel to this RLM followers are periodically > > > > > >> scanning > > > > > >> > the > > > > > >> > > > > > remote > > > > > >> > > > > > > > > storage > > > > > >> > > > > > > > > > > for files and if they find a new one they > > > > > >> > > > > > > > > > > update > > > > their > > > > > >> > indices. > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > Now, will the consumer be able to consume a > > > remote > > > > > >> > segment if: > > > > > >> > > > > > > > > > > - the remote segment is stored in the remote > > > > storage, > > > > > >> BUT > > > > > >> > > > > > > > > > > - the leader broker failed right after this AND > > > > > >> > > > > > > > > > > - the follower which is to become a leader > > > didn't > > > > scan > > > > > >> > yet for > > > > > >> > > > > a > > > > > >> > > > > > new > > > > > >> > > > > > > > > > > segment? > > > > > >> > > > > > > > > > > Would this result in an > > > OffsetOutOfRangeException > > > > or > > > > > >> > would the > > > > > >> > > > > > failover > > > > > >> > > > > > > > > > > halt the consume request until the new leader > > > has > > > > the > > > > > >> > latest > > > > > >> > > > > > > > > information? > > > > > >> > > > > > > > > > > As a follow-up question, what are your > > > > experiences, > > > > > >> does > > > > > >> > a > > > > > >> > > > > > failover in > > > > > >> > > > > > > > > a > > > > > >> > > > > > > > > > > broker causes bigger than usual churn in the > > > > > >> consumers? > > > > > >> > (I'm > > > > > >> > > > > > thinking > > > > > >> > > > > > > > > about > > > > > >> > > > > > > > > > > the time required to rebuild remote index > > > files.) > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > Viktor > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > On Mon, Apr 1, 2019 at 8:49 PM Harsha < > > > > > >> ka...@harsha.io> > > > > > >> > wrote: > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hi Eno, > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks for the comments. Answers are > > > > inline > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "Performance & durability > > > > > >> > > > > > > > > > > > ---------------------------------- > > > > > >> > > > > > > > > > > > - would be good to have more discussion on > > > > > >> performance > > > > > >> > > > > > implications > > > > > >> > > > > > > > > of > > > > > >> > > > > > > > > > > > tiering. Copying the data from the local > > > > storage to > > > > > >> the > > > > > >> > > > > remote > > > > > >> > > > > > > > > storage is > > > > > >> > > > > > > > > > > > going to be expensive in terms of network > > > > bandwidth > > > > > >> > and will > > > > > >> > > > > > affect > > > > > >> > > > > > > > > > > > foreground traffic to Kafka potentially > > > > reducing its > > > > > >> > > > > > throughput and > > > > > >> > > > > > > > > > > > latency." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Good point. We've run our local tests with > > > > 10GigE > > > > > >> > cards, even > > > > > >> > > > > > though > > > > > >> > > > > > > > > our > > > > > >> > > > > > > > > > > > clients bandwidth requirements are high with > > > > 1000s > > > > > >> of > > > > > >> > clients > > > > > >> > > > > > > > > producing / > > > > > >> > > > > > > > > > > > consuming data we never hit hit our limits on > > > > > >> network > > > > > >> > > > > > bandwidth. More > > > > > >> > > > > > > > > > > often > > > > > >> > > > > > > > > > > > we hit limits of CPU, Mem limits than the > > > > network > > > > > >> > bandwidth. > > > > > >> > > > > > But > > > > > >> > > > > > > > > this is > > > > > >> > > > > > > > > > > > something to be taken care of by the operator > > > if > > > > > >> they > > > > > >> > want to > > > > > >> > > > > > enable > > > > > >> > > > > > > > > > > tiered > > > > > >> > > > > > > > > > > > storage. > > > > > >> > > > > > > > > > > > Also as mentioned in the KIP/previous threads > > > > > >> ,clients > > > > > >> > > > > > requesting > > > > > >> > > > > > > > > older > > > > > >> > > > > > > > > > > > data is very rare and often used as insurance > > > > > >> policy . > > > > > >> > What > > > > > >> > > > > > proposed > > > > > >> > > > > > > > > here > > > > > >> > > > > > > > > > > > does increase bandwidth interms of shipping > > > > > >> > logsegments to > > > > > >> > > > > > remote but > > > > > >> > > > > > > > > > > > access patterns determines how much we end up > > > > > >> reading > > > > > >> > from > > > > > >> > > > > > remote > > > > > >> > > > > > > > > tier. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "- throttling the copying of the data above > > > > might > > > > > >> be a > > > > > >> > > > > > solution, > > > > > >> > > > > > > > > however, > > > > > >> > > > > > > > > > > > if > > > > > >> > > > > > > > > > > > you have a few TB of data to move to the > > > slower > > > > > >> remote > > > > > >> > tier > > > > > >> > > > > > the risk > > > > > >> > > > > > > > > is > > > > > >> > > > > > > > > > > > that the movement will never complete on time > > > > under > > > > > >> > high > > > > > >> > > > > Kafka > > > > > >> > > > > > load. > > > > > >> > > > > > > > > Do > > > > > >> > > > > > > > > > > we > > > > > >> > > > > > > > > > > > need a scheduler to use idle time to do the > > > > > >> copying?" > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > In our design, we are going to have scheduler > > > > in RLM > > > > > >> > which > > > > > >> > > > > will > > > > > >> > > > > > > > > > > > periodically copy in-active(rolled-over) log > > > > > >> segments. > > > > > >> > > > > > > > > > > > Not sure idle time is easy to calculate and > > > > > >> schedule a > > > > > >> > copy. > > > > > >> > > > > > More > > > > > >> > > > > > > > > over we > > > > > >> > > > > > > > > > > > want to copy the segments as soon as they are > > > > > >> > available. > > > > > >> > > > > > > > > > > > Throttling something we can take into account > > > > and > > > > > >> > provide > > > > > >> > > > > > options to > > > > > >> > > > > > > > > tune > > > > > >> > > > > > > > > > > > it. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "- Have you considered having two options: 1) > > > a > > > > slow > > > > > >> > tier > > > > > >> > > > > only > > > > > >> > > > > > > > > (e.g., all > > > > > >> > > > > > > > > > > > the data on HDFS) and 2) a fast tier only > > > > > >> > > > > > > > > > > > like > > > > Kafka > > > > > >> > today. > > > > > >> > > > > > This > > > > > >> > > > > > > > > would > > > > > >> > > > > > > > > > > > avoid copying data between the tiers. > > > Customers > > > > that > > > > > >> > can > > > > > >> > > > > > tolerate a > > > > > >> > > > > > > > > > > slower > > > > > >> > > > > > > > > > > > tier with a better price/GB can just choose > > > > option > > > > > >> > (1). Would > > > > > >> > > > > > be > > > > > >> > > > > > > > > good to > > > > > >> > > > > > > > > > > > put in Alternatives considered." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > What we want to have is Kafka that is known > > > to > > > > the > > > > > >> > users > > > > > >> > > > > > today with > > > > > >> > > > > > > > > > > local > > > > > >> > > > > > > > > > > > fast disk access and fast data serving layer. > > > > > >> Tiered > > > > > >> > Storage > > > > > >> > > > > > option > > > > > >> > > > > > > > > > > might > > > > > >> > > > > > > > > > > > not be for everyone and most users who are > > > happy > > > > > >> with > > > > > >> > Kafka > > > > > >> > > > > > today > > > > > >> > > > > > > > > > > shouldn't > > > > > >> > > > > > > > > > > > see changes to their operation because of > > > > > >> > > > > > > > > > > > this > > > > KIP. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Fundamentally, we believe remote tiered > > > storage > > > > data > > > > > >> > accessed > > > > > >> > > > > > very > > > > > >> > > > > > > > > > > > infrequently. We expect anyone going to read > > > > from > > > > > >> > remote > > > > > >> > > > > tiered > > > > > >> > > > > > > > > storage > > > > > >> > > > > > > > > > > > expects a slower read response (mostly > > > > backfills). > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Making an explicit change like slow/fast tier > > > > will > > > > > >> > only cause > > > > > >> > > > > > more > > > > > >> > > > > > > > > > > > confusion and operation complexity that will > > > > bring > > > > > >> > into play. > > > > > >> > > > > > With > > > > > >> > > > > > > > > tiered > > > > > >> > > > > > > > > > > > storage , only users who want to use cheaper > > > > > >> long-term > > > > > >> > > > > storage > > > > > >> > > > > > can > > > > > >> > > > > > > > > enable > > > > > >> > > > > > > > > > > > it and others can operate the Kafka as its > > > > today. > > > > > >> It > > > > > >> > will > > > > > >> > > > > > give a > > > > > >> > > > > > > > > good > > > > > >> > > > > > > > > > > > balance of serving latest reads from local > > > disk > > > > > >> almost > > > > > >> > all > > > > > >> > > > > the > > > > > >> > > > > > time > > > > > >> > > > > > > > > and > > > > > >> > > > > > > > > > > > shipping older data and reading from remote > > > tier > > > > > >> when > > > > > >> > clients > > > > > >> > > > > > needs > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > older data. If necessary, we can re-visit > > > > > >> > slow/fast-tier > > > > > >> > > > > > options at a > > > > > >> > > > > > > > > > > later > > > > > >> > > > > > > > > > > > point. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "Topic configs > > > > > >> > > > > > > > > > > > ------------------ > > > > > >> > > > > > > > > > > > - related to performance but also > > > availability, > > > > we > > > > > >> > need to > > > > > >> > > > > > discuss > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > replication mode for the remote tier. For > > > > example, > > > > > >> if > > > > > >> > the > > > > > >> > > > > Kafka > > > > > >> > > > > > > > > topics > > > > > >> > > > > > > > > > > used > > > > > >> > > > > > > > > > > > to have 3-way replication, will they continue > > > to > > > > > >> have > > > > > >> > 3-way > > > > > >> > > > > > > > > replication > > > > > >> > > > > > > > > > > on > > > > > >> > > > > > > > > > > > the remote tier? Will the user configure that > > > > > >> > replication? In > > > > > >> > > > > > S3 for > > > > > >> > > > > > > > > > > > example, one can choose from different S3 > > > tiers > > > > like > > > > > >> > STD or > > > > > >> > > > > > SIA, but > > > > > >> > > > > > > > > > > there > > > > > >> > > > > > > > > > > > is no direct control over the replication > > > factor > > > > > >> like > > > > > >> > in > > > > > >> > > > > > Kafka." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > No. Remote tier is expected to be reliable > > > > storage > > > > > >> > with its > > > > > >> > > > > own > > > > > >> > > > > > > > > > > > replication mechanisms. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > " how will security and ACLs be configured > > > > > >> > > > > > > > > > > > for > > > > the > > > > > >> > remote > > > > > >> > > > > tier. > > > > > >> > > > > > > > > E.g., if > > > > > >> > > > > > > > > > > > user A does not have access to a Kafka topic, > > > > when > > > > > >> > that topic > > > > > >> > > > > > is > > > > > >> > > > > > > > > moved to > > > > > >> > > > > > > > > > > > S3 or HDFS there needs to be a way to prevent > > > > access > > > > > >> > to the > > > > > >> > > > > S3 > > > > > >> > > > > > > > > bucket for > > > > > >> > > > > > > > > > > > that user. This might be outside the scope of > > > > this > > > > > >> KIP > > > > > >> > but > > > > > >> > > > > > would be > > > > > >> > > > > > > > > good > > > > > >> > > > > > > > > > > to > > > > > >> > > > > > > > > > > > discuss first." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > As mentioned in the KIP "Alternatives" > > > > > >> > > > > > > > > > > > section > > > > We > > > > > >> > will keep > > > > > >> > > > > > the > > > > > >> > > > > > > > > Kafka as > > > > > >> > > > > > > > > > > > the owner of those files in S3 or HDFS and > > > take > > > > > >> > advantage of > > > > > >> > > > > > HDFS > > > > > >> > > > > > > > > > > security > > > > > >> > > > > > > > > > > > model (file system permissions). So any user > > > who > > > > > >> wants > > > > > >> > to go > > > > > >> > > > > > > > > directly and > > > > > >> > > > > > > > > > > > access files from HDFS will not be able to > > > read > > > > them > > > > > >> > and any > > > > > >> > > > > > client > > > > > >> > > > > > > > > > > > requests will go through Kafka and its ACLs > > > will > > > > > >> apply > > > > > >> > like > > > > > >> > > > > it > > > > > >> > > > > > does > > > > > >> > > > > > > > > for > > > > > >> > > > > > > > > > > any > > > > > >> > > > > > > > > > > > other request. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hi Ron, > > > > > >> > > > > > > > > > > > Thanks for the comments. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > " I'm excited about this potential feature. > > > > Did you > > > > > >> > consider > > > > > >> > > > > > > > > > > > storing the information about the remote > > > > segments > > > > > >> in a > > > > > >> > Kafka > > > > > >> > > > > > topic as > > > > > >> > > > > > > > > > > > opposed to in the remote storage itself? The > > > > topic > > > > > >> > would > > > > > >> > > > > need > > > > > >> > > > > > > > > infinite > > > > > >> > > > > > > > > > > > retention (or it would need to be compacted) > > > so > > > > as > > > > > >> not > > > > > >> > to > > > > > >> > > > > > itself be > > > > > >> > > > > > > > > sent > > > > > >> > > > > > > > > > > to > > > > > >> > > > > > > > > > > > cold storage, but assuming that topic would > > > fit > > > > on > > > > > >> > local disk > > > > > >> > > > > > for all > > > > > >> > > > > > > > > > > time > > > > > >> > > > > > > > > > > > (an open question as to whether this is > > > > acceptable > > > > > >> or > > > > > >> > not) it > > > > > >> > > > > > feels > > > > > >> > > > > > > > > like > > > > > >> > > > > > > > > > > > the most natural way to communicate > > > information > > > > > >> among > > > > > >> > brokers > > > > > >> > > > > > -- more > > > > > >> > > > > > > > > > > > natural than having them poll the remote > > > storage > > > > > >> > systems, at > > > > > >> > > > > > least." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > With RemoteIndex we are extending the current > > > > index > > > > > >> > mechanism > > > > > >> > > > > > to > > > > > >> > > > > > > > > find a > > > > > >> > > > > > > > > > > > offset and its message to find a file in > > > remote > > > > > >> > storage for a > > > > > >> > > > > > givent > > > > > >> > > > > > > > > > > > offset. This will be optimal way finding for > > > > > >> > > > > > > > > > > > a > > > > given > > > > > >> > offset > > > > > >> > > > > > which > > > > > >> > > > > > > > > remote > > > > > >> > > > > > > > > > > > segment might be serving compare to storing > > > all > > > > of > > > > > >> > this data > > > > > >> > > > > > into > > > > > >> > > > > > > > > > > internal > > > > > >> > > > > > > > > > > > topic. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "To add to Eric's question/confusion about > > > where > > > > > >> logic > > > > > >> > lives > > > > > >> > > > > > (RLM vs. > > > > > >> > > > > > > > > > > RSM), > > > > > >> > > > > > > > > > > > I think it would be helpful to explicitly > > > > identify > > > > > >> in > > > > > >> > the KIP > > > > > >> > > > > > that > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > RLM > > > > > >> > > > > > > > > > > > delegates to the RSM since the RSM is part of > > > > the > > > > > >> > public API > > > > > >> > > > > > and is > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > pluggable piece. For example, instead of > > > saying > > > > > >> "RLM > > > > > >> > will > > > > > >> > > > > > ship the > > > > > >> > > > > > > > > log > > > > > >> > > > > > > > > > > > segment files that are older than a > > > configurable > > > > > >> time > > > > > >> > to > > > > > >> > > > > remote > > > > > >> > > > > > > > > storage" > > > > > >> > > > > > > > > > > I > > > > > >> > > > > > > > > > > > think it would be better to say "RLM > > > identifies > > > > log > > > > > >> > segment > > > > > >> > > > > > files > > > > > >> > > > > > > > > that > > > > > >> > > > > > > > > > > are > > > > > >> > > > > > > > > > > > older than a configurable time and delegates > > > to > > > > the > > > > > >> > > > > configured > > > > > >> > > > > > RSM to > > > > > >> > > > > > > > > > > ship > > > > > >> > > > > > > > > > > > them to remote storage" (or something like > > > that > > > > -- > > > > > >> > just make > > > > > >> > > > > > it clear > > > > > >> > > > > > > > > > > that > > > > > >> > > > > > > > > > > > the RLM is delegating to the configured > > > > > >> > > > > > > > > > > > RSM)." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks. I agree with you. I'll update the > > > > > >> > > > > > > > > > > > KIP. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hi Ambud, > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks for the comments. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "1. Wouldn't implicit checking for old > > > > > >> > > > > > > > > > > > offsets > > > > in > > > > > >> > remote > > > > > >> > > > > > location if > > > > > >> > > > > > > > > not > > > > > >> > > > > > > > > > > > found locally on the leader i.e. do we really > > > > need > > > > > >> > remote > > > > > >> > > > > index > > > > > >> > > > > > > > > files? > > > > > >> > > > > > > > > > > > Since the storage path for a given topic > > > > > >> > > > > > > > > > > > would > > > > > >> > presumably be > > > > > >> > > > > > constant > > > > > >> > > > > > > > > > > > across all the brokers, the remote > > > > topic-partition > > > > > >> > path could > > > > > >> > > > > > simply > > > > > >> > > > > > > > > be > > > > > >> > > > > > > > > > > > checked to see if there are any segment file > > > > names > > > > > >> > that would > > > > > >> > > > > > meet > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > offset requirements for a Consumer Fetch > > > > Request. > > > > > >> RSM > > > > > >> > > > > > implementations > > > > > >> > > > > > > > > > > could > > > > > >> > > > > > > > > > > > optionally cache this information." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > By storing the remote index files locally , > > > > > >> > > > > > > > > > > > it > > > > will > > > > > >> be > > > > > >> > faster > > > > > >> > > > > > for us > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > > > determine for a requested offset which file > > > > might > > > > > >> > contain the > > > > > >> > > > > > data. > > > > > >> > > > > > > > > This > > > > > >> > > > > > > > > > > > will help us resolve the remote file quickly > > > and > > > > > >> > return the > > > > > >> > > > > > response. > > > > > >> > > > > > > > > > > > Instead of making a call to remote tier for > > > > index > > > > > >> look > > > > > >> > up. > > > > > >> > > > > > Given > > > > > >> > > > > > > > > index > > > > > >> > > > > > > > > > > > files are smaller , it won't be much hit to > > > the > > > > > >> storage > > > > > >> > > > > space. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "2. Would it make sense to create an internal > > > > > >> > compacted Kafka > > > > > >> > > > > > topic > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > > > publish & record remote segment information? > > > > This > > > > > >> would > > > > > >> > > > > enable > > > > > >> > > > > > the > > > > > >> > > > > > > > > > > > followers to get updates about new segments > > > > rather > > > > > >> than > > > > > >> > > > > running > > > > > >> > > > > > > > > list() > > > > > >> > > > > > > > > > > > operations on remote storage to detect new > > > > segments > > > > > >> > which may > > > > > >> > > > > > be > > > > > >> > > > > > > > > > > > expensive." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > I think Ron also alluding to this. We thought > > > > > >> shipping > > > > > >> > remote > > > > > >> > > > > > index > > > > > >> > > > > > > > > files > > > > > >> > > > > > > > > > > > to remote storage files and let the > > > > > >> > > > > > > > > > > > follower's > > > > RLM > > > > > >> > picking > > > > > >> > > > > > that up > > > > > >> > > > > > > > > makes > > > > > >> > > > > > > > > > > it > > > > > >> > > > > > > > > > > > easy to have the current replication protocol > > > > > >> without > > > > > >> > any > > > > > >> > > > > > changes. > > > > > >> > > > > > > > > So we > > > > > >> > > > > > > > > > > > don't determine if a follower is in ISR or > > > > > >> > > > > > > > > > > > not > > > > > >> based on > > > > > >> > > > > another > > > > > >> > > > > > > > > topic's > > > > > >> > > > > > > > > > > > replication. We will run small tests and > > > > determine > > > > > >> if > > > > > >> > use of > > > > > >> > > > > > topic > > > > > >> > > > > > > > > is > > > > > >> > > > > > > > > > > > better for this. Thanks for the suggestion. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > 3. For RLM to scan local segment rotations > > > > > >> > > > > > > > > > > > are > > > > you > > > > > >> > thinking > > > > > >> > > > > of > > > > > >> > > > > > > > > leveraging > > > > > >> > > > > > > > > > > > java.nio.file.WatchService or simply running > > > > > >> > listFiles() on a > > > > > >> > > > > > > > > periodic > > > > > >> > > > > > > > > > > > basis? Since WatchService implementation is > > > > heavily > > > > > >> OS > > > > > >> > > > > > dependent it > > > > > >> > > > > > > > > might > > > > > >> > > > > > > > > > > > create some complications around missing FS > > > > Events. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Ideally we want to introduce file events like > > > > you > > > > > >> > suggested. > > > > > >> > > > > > For POC > > > > > >> > > > > > > > > work > > > > > >> > > > > > > > > > > > we are using just listFiles(). Also copying > > > > these > > > > > >> > files to > > > > > >> > > > > > remote > > > > > >> > > > > > > > > can be > > > > > >> > > > > > > > > > > > slower and we will not delete the files from > > > > local > > > > > >> > disk until > > > > > >> > > > > > the > > > > > >> > > > > > > > > segment > > > > > >> > > > > > > > > > > > is copied and any requests to the data in > > > these > > > > > >> files > > > > > >> > will be > > > > > >> > > > > > served > > > > > >> > > > > > > > > from > > > > > >> > > > > > > > > > > > local disk. So I don't think we need to be > > > > > >> aggressive > > > > > >> > and > > > > > >> > > > > > optimize > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > this > > > > > >> > > > > > > > > > > > copy segment to remote path. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hi Viktor, > > > > > >> > > > > > > > > > > > Thanks for the comments. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > "I have a rather technical question to this. > > > > How do > > > > > >> > you plan > > > > > >> > > > > to > > > > > >> > > > > > > > > package > > > > > >> > > > > > > > > > > > this > > > > > >> > > > > > > > > > > > extension? Does this mean that Kafka will > > > > depend on > > > > > >> > HDFS? > > > > > >> > > > > > > > > > > > I think it'd be nice to somehow separate this > > > > off > > > > > >> to a > > > > > >> > > > > > different > > > > > >> > > > > > > > > package > > > > > >> > > > > > > > > > > in > > > > > >> > > > > > > > > > > > the project so that it could be built and > > > > released > > > > > >> > separately > > > > > >> > > > > > from > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > main > > > > > >> > > > > > > > > > > > Kafka packages." > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > We would like all of this code to be part of > > > > Apache > > > > > >> > Kafka . > > > > > >> > > > > In > > > > > >> > > > > > early > > > > > >> > > > > > > > > days > > > > > >> > > > > > > > > > > > of Kafka, there is external module which used > > > to > > > > > >> > contain > > > > > >> > > > > kafka > > > > > >> > > > > > to > > > > > >> > > > > > > > > hdfs > > > > > >> > > > > > > > > > > copy > > > > > >> > > > > > > > > > > > tools and dependencies. We would like to > > > > > >> > > > > > > > > > > > have > > > > RLM > > > > > >> > (class > > > > > >> > > > > > > > > implementation) > > > > > >> > > > > > > > > > > > and RSM(interface) to be in core and as you > > > > > >> suggested, > > > > > >> > > > > > > > > implementation of > > > > > >> > > > > > > > > > > > RSM could be in another package so that the > > > > > >> > dependencies of > > > > > >> > > > > > RSM won't > > > > > >> > > > > > > > > > > come > > > > > >> > > > > > > > > > > > into Kafka's classpath unless someone > > > explicity > > > > > >> > configures > > > > > >> > > > > > them. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > Harsha > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Apr 1, 2019, at 1:02 AM, Viktor > > > > Somogyi-Vass > > > > > >> > wrote: > > > > > >> > > > > > > > > > > > > Hey Harsha, > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > I have a rather technical question to this. > > > > How do > > > > > >> > you plan > > > > > >> > > > > > to > > > > > >> > > > > > > > > package > > > > > >> > > > > > > > > > > > this > > > > > >> > > > > > > > > > > > > extension? Does this mean that Kafka will > > > > depend > > > > > >> on > > > > > >> > HDFS? > > > > > >> > > > > > > > > > > > > I think it'd be nice to somehow separate > > > this > > > > off > > > > > >> to > > > > > >> > a > > > > > >> > > > > > different > > > > > >> > > > > > > > > > > package > > > > > >> > > > > > > > > > > > in > > > > > >> > > > > > > > > > > > > the project so that it could be built and > > > > released > > > > > >> > > > > > separately from > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > main > > > > > >> > > > > > > > > > > > > Kafka packages. > > > > > >> > > > > > > > > > > > > This decoupling would be useful when direct > > > > > >> > dependency on > > > > > >> > > > > > HDFS (or > > > > > >> > > > > > > > > > > other > > > > > >> > > > > > > > > > > > > implementations) is not needed and would > > > also > > > > > >> > encourage > > > > > >> > > > > > decoupling > > > > > >> > > > > > > > > for > > > > > >> > > > > > > > > > > > > other storage implementations. > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Best, > > > > > >> > > > > > > > > > > > > Viktor > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > On Mon, Apr 1, 2019 at 3:44 AM Ambud Sharma > > > < > > > > > >> > > > > > > > > asharma52...@gmail.com> > > > > > >> > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi Harsha, > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thank you for proposing this KIP. We are > > > > looking > > > > > >> > forward > > > > > >> > > > > > to this > > > > > >> > > > > > > > > > > > feature as > > > > > >> > > > > > > > > > > > > > well. > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > A few questions around the design & > > > > > >> implementation: > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > 1. Wouldn't implicit checking for old > > > > offsets in > > > > > >> > remote > > > > > >> > > > > > location > > > > > >> > > > > > > > > if > > > > > >> > > > > > > > > > > not > > > > > >> > > > > > > > > > > > > > found locally on the leader i.e. do we > > > > really > > > > > >> need > > > > > >> > remote > > > > > >> > > > > > index > > > > > >> > > > > > > > > > > files? > > > > > >> > > > > > > > > > > > > > Since the storage path for a given topic > > > > would > > > > > >> > presumably > > > > > >> > > > > > be > > > > > >> > > > > > > > > constant > > > > > >> > > > > > > > > > > > > > across all the brokers, the remote > > > > > >> topic-partition > > > > > >> > path > > > > > >> > > > > > could > > > > > >> > > > > > > > > simply > > > > > >> > > > > > > > > > > be > > > > > >> > > > > > > > > > > > > > checked to see if there are any segment > > > file > > > > > >> names > > > > > >> > that > > > > > >> > > > > > would > > > > > >> > > > > > > > > meet > > > > > >> > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > offset requirements for a Consumer Fetch > > > > > >> Request. > > > > > >> > RSM > > > > > >> > > > > > > > > implementations > > > > > >> > > > > > > > > > > > could > > > > > >> > > > > > > > > > > > > > optionally cache this information. > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > 2. Would it make sense to create an > > > internal > > > > > >> > compacted > > > > > >> > > > > > Kafka > > > > > >> > > > > > > > > topic to > > > > > >> > > > > > > > > > > > > > publish & record remote segment > > > information? > > > > > >> This > > > > > >> > would > > > > > >> > > > > > enable > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > > > followers to get updates about new > > > segments > > > > > >> rather > > > > > >> > than > > > > > >> > > > > > running > > > > > >> > > > > > > > > > > list() > > > > > >> > > > > > > > > > > > > > operations on remote storage to detect > > > > > >> > > > > > > > > > > > > > new > > > > > >> > segments which > > > > > >> > > > > > may be > > > > > >> > > > > > > > > > > > expensive. > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > 3. For RLM to scan local segment > > > > > >> > > > > > > > > > > > > > rotations > > > > are > > > > > >> you > > > > > >> > > > > > thinking of > > > > > >> > > > > > > > > > > > leveraging > > > > > >> > > > > > > > > > > > > > java.nio.file.WatchService or simply > > > running > > > > > >> > listFiles() > > > > > >> > > > > > on a > > > > > >> > > > > > > > > > > periodic > > > > > >> > > > > > > > > > > > > > basis? Since WatchService implementation > > > is > > > > > >> > heavily OS > > > > > >> > > > > > dependent > > > > > >> > > > > > > > > it > > > > > >> > > > > > > > > > > > might > > > > > >> > > > > > > > > > > > > > create some complications around missing > > > FS > > > > > >> Events. > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks. > > > > > >> > > > > > > > > > > > > > Ambud > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Thu, Mar 28, 2019 at 8:04 AM Ron > > > > Dagostino < > > > > > >> > > > > > rndg...@gmail.com > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hi Harsha. I'm excited about this > > > > potential > > > > > >> > feature. > > > > > >> > > > > > Did you > > > > > >> > > > > > > > > > > > consider > > > > > >> > > > > > > > > > > > > > > storing the information about the > > > > > >> > > > > > > > > > > > > > > remote > > > > > >> > segments in a > > > > > >> > > > > > Kafka > > > > > >> > > > > > > > > topic > > > > > >> > > > > > > > > > > as > > > > > >> > > > > > > > > > > > > > > opposed to in the remote storage > > > > > >> > > > > > > > > > > > > > > itself? > > > > The > > > > > >> > topic > > > > > >> > > > > > would need > > > > > >> > > > > > > > > > > > infinite > > > > > >> > > > > > > > > > > > > > > retention (or it would need to be > > > > compacted) > > > > > >> so > > > > > >> > as not > > > > > >> > > > > to > > > > > >> > > > > > > > > itself be > > > > > >> > > > > > > > > > > > sent > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > cold storage, but assuming that topic > > > > would > > > > > >> fit > > > > > >> > on > > > > > >> > > > > local > > > > > >> > > > > > disk > > > > > >> > > > > > > > > for > > > > > >> > > > > > > > > > > all > > > > > >> > > > > > > > > > > > > > time > > > > > >> > > > > > > > > > > > > > > (an open question as to whether this is > > > > > >> > acceptable or > > > > > >> > > > > > not) it > > > > > >> > > > > > > > > feels > > > > > >> > > > > > > > > > > > like > > > > > >> > > > > > > > > > > > > > > the most natural way to communicate > > > > > >> information > > > > > >> > among > > > > > >> > > > > > brokers > > > > > >> > > > > > > > > -- > > > > > >> > > > > > > > > > > more > > > > > >> > > > > > > > > > > > > > > natural than having them poll the > > > > > >> > > > > > > > > > > > > > > remote > > > > > >> storage > > > > > >> > > > > > systems, at > > > > > >> > > > > > > > > least. > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > To add to Eric's question/confusion > > > about > > > > > >> where > > > > > >> > logic > > > > > >> > > > > > lives > > > > > >> > > > > > > > > (RLM > > > > > >> > > > > > > > > > > vs. > > > > > >> > > > > > > > > > > > > > RSM), > > > > > >> > > > > > > > > > > > > > > I think it would be helpful to > > > explicitly > > > > > >> > identify in > > > > > >> > > > > > the KIP > > > > > >> > > > > > > > > that > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > RLM > > > > > >> > > > > > > > > > > > > > > delegates to the RSM since the RSM is > > > > part of > > > > > >> the > > > > > >> > > > > public > > > > > >> > > > > > API > > > > > >> > > > > > > > > and is > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > pluggable piece. For example, instead > > > of > > > > > >> saying > > > > > >> > "RLM > > > > > >> > > > > > will > > > > > >> > > > > > > > > ship the > > > > > >> > > > > > > > > > > > log > > > > > >> > > > > > > > > > > > > > > segment files that are older than a > > > > > >> configurable > > > > > >> > time > > > > > >> > > > > to > > > > > >> > > > > > remote > > > > > >> > > > > > > > > > > > storage" > > > > > >> > > > > > > > > > > > > > I > > > > > >> > > > > > > > > > > > > > > think it would be better to say "RLM > > > > > >> identifies > > > > > >> > log > > > > > >> > > > > > segment > > > > > >> > > > > > > > > files > > > > > >> > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > are > > > > > >> > > > > > > > > > > > > > > older than a configurable time and > > > > delegates > > > > > >> to > > > > > >> > the > > > > > >> > > > > > configured > > > > > >> > > > > > > > > RSM > > > > > >> > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > ship > > > > > >> > > > > > > > > > > > > > > them to remote storage" (or something > > > like > > > > > >> that > > > > > >> > -- just > > > > > >> > > > > > make it > > > > > >> > > > > > > > > > > clear > > > > > >> > > > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > > the RLM is delegating to the configured > > > > RSM). > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Ron > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Thu, Mar 28, 2019 at 6:12 AM Eno > > > > Thereska < > > > > > >> > > > > > > > > > > eno.there...@gmail.com > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks Harsha, > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > A couple of comments: > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Performance & durability > > > > > >> > > > > > > > > > > > > > > > ---------------------------------- > > > > > >> > > > > > > > > > > > > > > > - would be good to have more > > > discussion > > > > on > > > > > >> > > > > performance > > > > > >> > > > > > > > > > > > implications of > > > > > >> > > > > > > > > > > > > > > > tiering. Copying the data from the > > > local > > > > > >> > storage to > > > > > >> > > > > the > > > > > >> > > > > > > > > remote > > > > > >> > > > > > > > > > > > storage > > > > > >> > > > > > > > > > > > > > is > > > > > >> > > > > > > > > > > > > > > > going to be expensive in terms of > > > > network > > > > > >> > bandwidth > > > > > >> > > > > > and will > > > > > >> > > > > > > > > > > affect > > > > > >> > > > > > > > > > > > > > > > foreground traffic to Kafka > > > potentially > > > > > >> > reducing its > > > > > >> > > > > > > > > throughput > > > > > >> > > > > > > > > > > and > > > > > >> > > > > > > > > > > > > > > > latency. > > > > > >> > > > > > > > > > > > > > > > - throttling the copying of the data > > > > above > > > > > >> > might be a > > > > > >> > > > > > > > > solution, > > > > > >> > > > > > > > > > > > however > > > > > >> > > > > > > > > > > > > > > if > > > > > >> > > > > > > > > > > > > > > > you have a few TB of data to move to > > > the > > > > > >> slower > > > > > >> > > > > remote > > > > > >> > > > > > tier > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > risk is > > > > > >> > > > > > > > > > > > > > > > that the movement will never complete > > > on > > > > > >> time > > > > > >> > under > > > > > >> > > > > > high > > > > > >> > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > load. Do > > > > > >> > > > > > > > > > > > > > > we > > > > > >> > > > > > > > > > > > > > > > need a scheduler to use idle time to > > > do > > > > the > > > > > >> > copying? > > > > > >> > > > > > > > > > > > > > > > - Have you considered having two > > > > options: > > > > > >> 1) a > > > > > >> > slow > > > > > >> > > > > > tier only > > > > > >> > > > > > > > > > > > (e.g., > > > > > >> > > > > > > > > > > > > > all > > > > > >> > > > > > > > > > > > > > > > the data on HDFS) and 2) a fast tier > > > > only > > > > > >> like > > > > > >> > Kafka > > > > > >> > > > > > today. > > > > > >> > > > > > > > > This > > > > > >> > > > > > > > > > > > would > > > > > >> > > > > > > > > > > > > > > > avoid copying data between the tiers. > > > > > >> > Customers that > > > > > >> > > > > > can > > > > > >> > > > > > > > > > > tolerate a > > > > > >> > > > > > > > > > > > > > > slower > > > > > >> > > > > > > > > > > > > > > > tier with a better price/GB can just > > > > choose > > > > > >> > option > > > > > >> > > > > (1). > > > > > >> > > > > > > > > Would be > > > > > >> > > > > > > > > > > > good > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > put in Alternatives considered. > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Topic configs > > > > > >> > > > > > > > > > > > > > > > ------------------ > > > > > >> > > > > > > > > > > > > > > > - related to performance but also > > > > > >> > availability, we > > > > > >> > > > > > need to > > > > > >> > > > > > > > > > > discuss > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > replication mode for the remote tier. > > > > For > > > > > >> > example, if > > > > > >> > > > > > the > > > > > >> > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > topics > > > > > >> > > > > > > > > > > > > > > used > > > > > >> > > > > > > > > > > > > > > > to have 3-way replication, will they > > > > > >> continue > > > > > >> > to have > > > > > >> > > > > > 3-way > > > > > >> > > > > > > > > > > > replication > > > > > >> > > > > > > > > > > > > > > on > > > > > >> > > > > > > > > > > > > > > > the remote tier? Will the user > > > configure > > > > > >> that > > > > > >> > > > > > replication? > > > > > >> > > > > > > > > In S3 > > > > > >> > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > > example, one can choose from > > > > > >> > > > > > > > > > > > > > > > different > > > > S3 > > > > > >> > tiers like > > > > > >> > > > > > STD or > > > > > >> > > > > > > > > SIA, > > > > > >> > > > > > > > > > > > but > > > > > >> > > > > > > > > > > > > > > there > > > > > >> > > > > > > > > > > > > > > > is no direct control over the > > > > replication > > > > > >> > factor like > > > > > >> > > > > > in > > > > > >> > > > > > > > > Kafka. > > > > > >> > > > > > > > > > > > > > > > - how will security and ACLs be > > > > configured > > > > > >> for > > > > > >> > the > > > > > >> > > > > > remote > > > > > >> > > > > > > > > tier. > > > > > >> > > > > > > > > > > > E.g., > > > > > >> > > > > > > > > > > > > > if > > > > > >> > > > > > > > > > > > > > > > user A does not have access to a > > > > > >> > > > > > > > > > > > > > > > Kafka > > > > > >> topic, > > > > > >> > when > > > > > >> > > > > that > > > > > >> > > > > > > > > topic is > > > > > >> > > > > > > > > > > > moved > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > S3 or HDFS there needs to be a way to > > > > > >> prevent > > > > > >> > access > > > > > >> > > > > > to the > > > > > >> > > > > > > > > S3 > > > > > >> > > > > > > > > > > > bucket > > > > > >> > > > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > > that user. This might be outside the > > > > scope > > > > > >> of > > > > > >> > this > > > > > >> > > > > KIP > > > > > >> > > > > > but > > > > > >> > > > > > > > > would > > > > > >> > > > > > > > > > > be > > > > > >> > > > > > > > > > > > > > good > > > > > >> > > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > discuss first. > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > That's it for now, thanks > > > > > >> > > > > > > > > > > > > > > > Eno > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Wed, Mar 27, 2019 at 4:40 PM > > > Harsha < > > > > > >> > > > > > ka...@harsha.io> > > > > > >> > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Hi All, > > > > > >> > > > > > > > > > > > > > > > > Thanks for your initial > > > > > >> feedback. > > > > > >> > We > > > > > >> > > > > > updated the > > > > > >> > > > > > > > > > > KIP. > > > > > >> > > > > > > > > > > > > > Please > > > > > >> > > > > > > > > > > > > > > > > take a look and let us know if you > > > > have > > > > > >> any > > > > > >> > > > > > questions. > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D405-253A-2BKafka-2BTiered-2BStorage&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=-ziWR0OoT572f_vF7rL4_piiHhd1niMyHst37PHFfr8&e= > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > Harsha > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Wed, Feb 6, 2019, at 10:30 AM, > > > > Harsha > > > > > >> > wrote: > > > > > >> > > > > > > > > > > > > > > > > > Thanks Eno, Adam & Satish for you > > > > review > > > > > >> > and > > > > > >> > > > > > questions. > > > > > >> > > > > > > > > I'll > > > > > >> > > > > > > > > > > > > > address > > > > > >> > > > > > > > > > > > > > > > > > these in KIP and update the > > > > > >> > > > > > > > > > > > > > > > > > thread > > > > here. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > Harsha > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > On Wed, Feb 6, 2019, at 7:09 AM, > > > > Satish > > > > > >> > Duggana > > > > > >> > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > Thanks, Harsha for the KIP. It > > > is > > > > a > > > > > >> good > > > > > >> > start > > > > > >> > > > > > for > > > > > >> > > > > > > > > tiered > > > > > >> > > > > > > > > > > > storage > > > > > >> > > > > > > > > > > > > > > in > > > > > >> > > > > > > > > > > > > > > > > > > Kafka. I have a few > > > > > >> comments/questions. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > It may be good to have a > > > > configuration > > > > > >> > to keep > > > > > >> > > > > > the > > > > > >> > > > > > > > > number > > > > > >> > > > > > > > > > > of > > > > > >> > > > > > > > > > > > > > local > > > > > >> > > > > > > > > > > > > > > > > > > segments instead of keeping > > > > > >> > > > > > > > > > > > > > > > > > > only > > > > the > > > > > >> > active > > > > > >> > > > > > segment. > > > > > >> > > > > > > > > This > > > > > >> > > > > > > > > > > > config > > > > > >> > > > > > > > > > > > > > > can > > > > > >> > > > > > > > > > > > > > > > > > > be exposed at cluster and topic > > > > levels > > > > > >> > with > > > > > >> > > > > > default > > > > > >> > > > > > > > > value > > > > > >> > > > > > > > > > > as > > > > > >> > > > > > > > > > > > 1. > > > > > >> > > > > > > > > > > > > > In > > > > > >> > > > > > > > > > > > > > > > > > > some use cases, few consumers > > > may > > > > lag > > > > > >> > over one > > > > > >> > > > > > > > > segment, it > > > > > >> > > > > > > > > > > > will > > > > > >> > > > > > > > > > > > > > be > > > > > >> > > > > > > > > > > > > > > > > > > better to serve from local > > > storage > > > > > >> > instead of > > > > > >> > > > > > remote > > > > > >> > > > > > > > > > > storage. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > It may be better to keep > > > > > >> > > > > > “remote.log.storage.enable” > > > > > >> > > > > > > > > and > > > > > >> > > > > > > > > > > > > > respective > > > > > >> > > > > > > > > > > > > > > > > > > configuration at topic level > > > along > > > > > >> with > > > > > >> > cluster > > > > > >> > > > > > level. > > > > > >> > > > > > > > > It > > > > > >> > > > > > > > > > > > will be > > > > > >> > > > > > > > > > > > > > > > > > > helpful in environments where > > > few > > > > > >> topics > > > > > >> > are > > > > > >> > > > > > configured > > > > > >> > > > > > > > > > > with > > > > > >> > > > > > > > > > > > > > > > > > > local-storage and other topics > > > are > > > > > >> > configured > > > > > >> > > > > > with > > > > > >> > > > > > > > > remote > > > > > >> > > > > > > > > > > > > > storage. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Each topic-partition leader > > > > pushes its > > > > > >> > log > > > > > >> > > > > > segments > > > > > >> > > > > > > > > with > > > > > >> > > > > > > > > > > > > > respective > > > > > >> > > > > > > > > > > > > > > > > > > index files to remote whenever > > > > active > > > > > >> > log rolls > > > > > >> > > > > > over, > > > > > >> > > > > > > > > it > > > > > >> > > > > > > > > > > > updates > > > > > >> > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > > > remote log index file for the > > > > > >> respective > > > > > >> > remote > > > > > >> > > > > > log > > > > > >> > > > > > > > > > > segment. > > > > > >> > > > > > > > > > > > The > > > > > >> > > > > > > > > > > > > > > > > > > second option is to add offset > > > > index > > > > > >> > files also > > > > > >> > > > > > for > > > > > >> > > > > > > > > each > > > > > >> > > > > > > > > > > > segment. > > > > > >> > > > > > > > > > > > > > > It > > > > > >> > > > > > > > > > > > > > > > > > > can serve consumer fetch > > > requests > > > > for > > > > > >> old > > > > > >> > > > > > segments from > > > > > >> > > > > > > > > > > > local log > > > > > >> > > > > > > > > > > > > > > > > > > segment instead of serving > > > > directly > > > > > >> from > > > > > >> > the > > > > > >> > > > > > remote log > > > > > >> > > > > > > > > > > > which may > > > > > >> > > > > > > > > > > > > > > > > > > cause high latencies. There can > > > be > > > > > >> > different > > > > > >> > > > > > > > > strategies in > > > > > >> > > > > > > > > > > > when > > > > > >> > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > > > remote segment is copied to a > > > > local > > > > > >> > segment. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > What is “ > > > > > >> > > > > > > > > https://urldefense.proofpoint.com/v2/url?u=http-3A__remote.log.manager.scheduler.interval.ms&d=DwIFaQ&c=r2dcLCtU9q6n0vrtnDw9vg&r=g7ujYPRBvNrON18SBeCt4g&m=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE&s=9oUlB2ISLCWx4lCUYZCIk9goBoiDcPa0QqAY0TdiV6w&e= > > > ” > > > > > >> > > > > > > > > config > > > > > >> > > > > > > > > > > > about? > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > How do followers sync > > > > > >> > RemoteLogSegmentIndex > > > > > >> > > > > > files? Do > > > > > >> > > > > > > > > they > > > > > >> > > > > > > > > > > > > > request > > > > > >> > > > > > > > > > > > > > > > > > > from leader replica? This looks > > > > to be > > > > > >> > important > > > > > >> > > > > > as the > > > > > >> > > > > > > > > > > failed > > > > > >> > > > > > > > > > > > > > over > > > > > >> > > > > > > > > > > > > > > > > > > leader should have > > > > > >> RemoteLogSegmentIndex > > > > > >> > > > > updated > > > > > >> > > > > > and > > > > > >> > > > > > > > > ready > > > > > >> > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > avoid > > > > > >> > > > > > > > > > > > > > > > > > > high latencies in serving old > > > data > > > > > >> > stored in > > > > > >> > > > > > remote > > > > > >> > > > > > > > > logs. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > > Satish. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On Tue, Feb 5, 2019 at 10:42 PM > > > > Ryanne > > > > > >> > Dolan < > > > > > >> > > > > > > > > > > > > > > ryannedo...@gmail.com> > > > > > >> > > > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Thanks Harsha, makes sense. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > Ryanne > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 5:53 > > > > > >> > > > > > > > > > > > > > > > > > > > PM > > > > > >> Harsha > > > > > >> > > > > > Chintalapani < > > > > > >> > > > > > > > > > > > > > > > ka...@harsha.io> > > > > > >> > > > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > "I think you are saying > > > > > >> > > > > > > > > > > > > > > > > > > > > that > > > > this > > > > > >> > enables > > > > > >> > > > > > > > > additional > > > > > >> > > > > > > > > > > > > > > (potentially > > > > > >> > > > > > > > > > > > > > > > > cheaper) > > > > > >> > > > > > > > > > > > > > > > > > > > > storage options without > > > > > >> *requiring* > > > > > >> > an > > > > > >> > > > > > existing ETL > > > > > >> > > > > > > > > > > > > > pipeline. “ > > > > > >> > > > > > > > > > > > > > > > > > > > > Yes. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > " But it's not really a > > > > > >> replacement > > > > > >> > for the > > > > > >> > > > > > sort of > > > > > >> > > > > > > > > > > > pipelines > > > > > >> > > > > > > > > > > > > > > > > people build > > > > > >> > > > > > > > > > > > > > > > > > > > > with Connect, Gobblin etc.” > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > It is not. But also making > > > an > > > > > >> > assumption > > > > > >> > > > > that > > > > > >> > > > > > > > > everyone > > > > > >> > > > > > > > > > > > runs > > > > > >> > > > > > > > > > > > > > > these > > > > > >> > > > > > > > > > > > > > > > > > > > > pipelines for storing raw > > > > Kafka > > > > > >> data > > > > > >> > into > > > > > >> > > > > > HDFS or > > > > > >> > > > > > > > > S3 is > > > > > >> > > > > > > > > > > > also > > > > > >> > > > > > > > > > > > > > > > wrong > > > > > >> > > > > > > > > > > > > > > > > > > > > assumption. > > > > > >> > > > > > > > > > > > > > > > > > > > > The aim of this KIP is to > > > > provide > > > > > >> > tiered > > > > > >> > > > > > storage as > > > > > >> > > > > > > > > > > whole > > > > > >> > > > > > > > > > > > > > > package > > > > > >> > > > > > > > > > > > > > > > > not > > > > > >> > > > > > > > > > > > > > > > > > > > > asking users to ship the > > > data > > > > on > > > > > >> > their own > > > > > >> > > > > > using > > > > > >> > > > > > > > > > > existing > > > > > >> > > > > > > > > > > > > > ETL, > > > > > >> > > > > > > > > > > > > > > > > which means > > > > > >> > > > > > > > > > > > > > > > > > > > > running a consumer and > > > > maintaining > > > > > >> > those > > > > > >> > > > > > pipelines. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > " My point was that, if you > > > > are > > > > > >> > already > > > > > >> > > > > > offloading > > > > > >> > > > > > > > > > > > records in > > > > > >> > > > > > > > > > > > > > > an > > > > > >> > > > > > > > > > > > > > > > > ETL > > > > > >> > > > > > > > > > > > > > > > > > > > > pipeline, why do you need a > > > > new > > > > > >> > pipeline > > > > > >> > > > > > built > > > > > >> > > > > > > > > into the > > > > > >> > > > > > > > > > > > > > broker > > > > > >> > > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > > ship the > > > > > >> > > > > > > > > > > > > > > > > > > > > same data to the same > > > place?” > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > As you said its ETL > > > pipeline, > > > > > >> which > > > > > >> > means > > > > > >> > > > > > users of > > > > > >> > > > > > > > > > > these > > > > > >> > > > > > > > > > > > > > > > pipelines > > > > > >> > > > > > > > > > > > > > > > > are > > > > > >> > > > > > > > > > > > > > > > > > > > > reading the data from > > > > > >> > > > > > > > > > > > > > > > > > > > > broker > > > > and > > > > > >> > > > > > transforming its > > > > > >> > > > > > > > > state > > > > > >> > > > > > > > > > > > and > > > > > >> > > > > > > > > > > > > > > > > storing it > > > > > >> > > > > > > > > > > > > > > > > > > > > somewhere. > > > > > >> > > > > > > > > > > > > > > > > > > > > The point of this KIP is > > > > store log > > > > > >> > segments > > > > > >> > > > > > as it > > > > > >> > > > > > > > > is > > > > > >> > > > > > > > > > > > without > > > > > >> > > > > > > > > > > > > > > > > changing > > > > > >> > > > > > > > > > > > > > > > > > > > > their structure so that we > > > > can use > > > > > >> > the > > > > > >> > > > > > existing > > > > > >> > > > > > > > > offset > > > > > >> > > > > > > > > > > > > > > mechanisms > > > > > >> > > > > > > > > > > > > > > > > to look > > > > > >> > > > > > > > > > > > > > > > > > > > > it up when the consumer > > > needs > > > > to > > > > > >> > read old > > > > > >> > > > > > data. > > > > > >> > > > > > > > > When > > > > > >> > > > > > > > > > > you > > > > > >> > > > > > > > > > > > do > > > > > >> > > > > > > > > > > > > > > load > > > > > >> > > > > > > > > > > > > > > > > it via > > > > > >> > > > > > > > > > > > > > > > > > > > > your existing pipelines you > > > > are > > > > > >> > reading the > > > > > >> > > > > > topic > > > > > >> > > > > > > > > as a > > > > > >> > > > > > > > > > > > whole > > > > > >> > > > > > > > > > > > > > , > > > > > >> > > > > > > > > > > > > > > > > which > > > > > >> > > > > > > > > > > > > > > > > > > > > doesn’t guarantee that > > > you’ll > > > > > >> > produce this > > > > > >> > > > > > data > > > > > >> > > > > > > > > back > > > > > >> > > > > > > > > > > into > > > > > >> > > > > > > > > > > > > > HDFS > > > > > >> > > > > > > > > > > > > > > in > > > > > >> > > > > > > > > > > > > > > > > S3 in the > > > > > >> > > > > > > > > > > > > > > > > > > > > same order and who is going > > > to > > > > > >> > generate the > > > > > >> > > > > > Index > > > > > >> > > > > > > > > files > > > > > >> > > > > > > > > > > > > > again. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > "So you'd end up with one > > > > > >> > > > > > > > > > > > > > > > > > > > > of > > > > > >> 1)cold > > > > > >> > > > > segments > > > > > >> > > > > > are > > > > > >> > > > > > > > > only > > > > > >> > > > > > > > > > > > useful > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > > Kafka; 2) > > > > > >> > > > > > > > > > > > > > > > > > > > > you have the same data > > > > written to > > > > > >> > HDFS/etc > > > > > >> > > > > > twice, > > > > > >> > > > > > > > > once > > > > > >> > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > and once > > > > > >> > > > > > > > > > > > > > > > > > > > > for everything else, in two > > > > > >> separate > > > > > >> > > > > formats” > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > You are talking two > > > different > > > > use > > > > > >> > cases. If > > > > > >> > > > > > > > > someone is > > > > > >> > > > > > > > > > > > > > storing > > > > > >> > > > > > > > > > > > > > > > raw > > > > > >> > > > > > > > > > > > > > > > > data > > > > > >> > > > > > > > > > > > > > > > > > > > > out of Kafka for long term > > > > access. > > > > > >> > > > > > > > > > > > > > > > > > > > > By storing the data as it > > > > > >> > > > > > > > > > > > > > > > > > > > > is > > > > in > > > > > >> HDFS > > > > > >> > though > > > > > >> > > > > > Kafka > > > > > >> > > > > > > > > will > > > > > >> > > > > > > > > > > > solve > > > > > >> > > > > > > > > > > > > > > this > > > > > >> > > > > > > > > > > > > > > > > issue. > > > > > >> > > > > > > > > > > > > > > > > > > > > They do not need to run > > > > another > > > > > >> > pipe-line > > > > > >> > > > > to > > > > > >> > > > > > ship > > > > > >> > > > > > > > > these > > > > > >> > > > > > > > > > > > logs. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > If they are running > > > pipelines > > > > to > > > > > >> > store in > > > > > >> > > > > > HDFS in a > > > > > >> > > > > > > > > > > > different > > > > > >> > > > > > > > > > > > > > > > > format, > > > > > >> > > > > > > > > > > > > > > > > > > > > thats a different use case. > > > > May be > > > > > >> > they are > > > > > >> > > > > > > > > > > transforming > > > > > >> > > > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > logs to ORC > > > > > >> > > > > > > > > > > > > > > > > > > > > so that they can query > > > through > > > > > >> > Hive. Once > > > > > >> > > > > > you > > > > > >> > > > > > > > > > > transform > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > log > > > > > >> > > > > > > > > > > > > > > > > segment it > > > > > >> > > > > > > > > > > > > > > > > > > > > does loose its ability to > > > use > > > > the > > > > > >> > existing > > > > > >> > > > > > offset > > > > > >> > > > > > > > > > > index. > > > > > >> > > > > > > > > > > > > > > > > > > > > Main objective here not to > > > > change > > > > > >> the > > > > > >> > > > > > existing > > > > > >> > > > > > > > > protocol > > > > > >> > > > > > > > > > > > and > > > > > >> > > > > > > > > > > > > > > still > > > > > >> > > > > > > > > > > > > > > > > be able > > > > > >> > > > > > > > > > > > > > > > > > > > > to write and read logs from > > > > remote > > > > > >> > storage. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > -Harsha > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > On Feb 4, 2019, 2:53 PM > > > -0800, > > > > > >> Ryanne > > > > > >> > > > > Dolan < > > > > > >> > > > > > > > > > > > > > > > ryannedo...@gmail.com > > > > > >> > > > > > > > > > > > > > > > > >, > > > > > >> > > > > > > > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > Thanks Harsha, makes > > > > > >> > > > > > > > > > > > > > > > > > > > > > sense > > > > for > > > > > >> the > > > > > >> > most > > > > > >> > > > > > part. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > tiered storage is to > > > > > >> > > > > > > > > > > > > > > > > > > > > > > get > > > > away > > > > > >> > from this > > > > > >> > > > > > and > > > > > >> > > > > > > > > make > > > > > >> > > > > > > > > > > this > > > > > >> > > > > > > > > > > > > > > > > transparent to > > > > > >> > > > > > > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > > > > > > user > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > I think you are saying > > > that > > > > this > > > > > >> > enables > > > > > >> > > > > > > > > additional > > > > > >> > > > > > > > > > > > > > > > (potentially > > > > > >> > > > > > > > > > > > > > > > > cheaper) > > > > > >> > > > > > > > > > > > > > > > > > > > > > storage options without > > > > > >> > *requiring* an > > > > > >> > > > > > existing > > > > > >> > > > > > > > > ETL > > > > > >> > > > > > > > > > > > > > pipeline. > > > > > >> > > > > > > > > > > > > > > > > But it's > > > > > >> > > > > > > > > > > > > > > > > > > > > not > > > > > >> > > > > > > > > > > > > > > > > > > > > > really a replacement for > > > the > > > > > >> sort > > > > > >> > of > > > > > >> > > > > > pipelines > > > > > >> > > > > > > > > people > > > > > >> > > > > > > > > > > > build > > > > > >> > > > > > > > > > > > > > > > with > > > > > >> > > > > > > > > > > > > > > > > Connect, > > > > > >> > > > > > > > > > > > > > > > > > > > > > Gobblin etc. My point was > > > > that, > > > > > >> if > > > > > >> > you > > > > > >> > > > > are > > > > > >> > > > > > > > > already > > > > > >> > > > > > > > > > > > > > offloading > > > > > >> > > > > > > > > > > > > > > > > records in > > > > > >> > > > > > > > > > > > > > > > > > > > > an > > > > > >> > > > > > > > > > > > > > > > > > > > > > ETL pipeline, why do you > > > > need a > > > > > >> new > > > > > >> > > > > > pipeline > > > > > >> > > > > > > > > built > > > > > >> > > > > > > > > > > > into the > > > > > >> > > > > > > > > > > > > > > > > broker to > > > > > >> > > > > > > > > > > > > > > > > > > > > ship > > > > > >> > > > > > > > > > > > > > > > > > > > > > the same data to the same > > > > > >> place? I > > > > > >> > think > > > > > >> > > > > > in most > > > > > >> > > > > > > > > > > cases > > > > > >> > > > > > > > > > > > this > > > > > >> > > > > > > > > > > > > > > > will > > > > > >> > > > > > > > > > > > > > > > > be an > > > > > >> > > > > > > > > > > > > > > > > > > > > > additional pipeline, not > > > > > >> > > > > > > > > > > > > > > > > > > > > > a > > > > > >> > replacement, > > > > > >> > > > > > because > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > > > segments > > > > > >> > > > > > > > > > > > > > > > > written to > > > > > >> > > > > > > > > > > > > > > > > > > > > > cold storage won't be > > > useful > > > > > >> > outside > > > > > >> > > > > > Kafka. So > > > > > >> > > > > > > > > you'd > > > > > >> > > > > > > > > > > > end up > > > > > >> > > > > > > > > > > > > > > > with > > > > > >> > > > > > > > > > > > > > > > > one of > > > > > >> > > > > > > > > > > > > > > > > > > > > 1) > > > > > >> > > > > > > > > > > > > > > > > > > > > > cold segments are only > > > > useful to > > > > > >> > Kafka; > > > > > >> > > > > 2) > > > > > >> > > > > > you > > > > > >> > > > > > > > > have > > > > > >> > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > same > > > > > >> > > > > > > > > > > > > > > > > data written > > > > > >> > > > > > > > > > > > > > > > > > > > > > to HDFS/etc twice, once > > > for > > > > > >> Kafka > > > > > >> > and > > > > > >> > > > > once > > > > > >> > > > > > for > > > > > >> > > > > > > > > > > > everything > > > > > >> > > > > > > > > > > > > > > else, > > > > > >> > > > > > > > > > > > > > > > > in two > > > > > >> > > > > > > > > > > > > > > > > > > > > > separate formats; 3) you > > > use > > > > > >> your > > > > > >> > > > > existing > > > > > >> > > > > > ETL > > > > > >> > > > > > > > > > > > pipeline and > > > > > >> > > > > > > > > > > > > > > > read > > > > > >> > > > > > > > > > > > > > > > > cold > > > > > >> > > > > > > > > > > > > > > > > > > > > data > > > > > >> > > > > > > > > > > > > > > > > > > > > > directly. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > To me, an ideal solution > > > > would > > > > > >> let > > > > > >> > me > > > > > >> > > > > spool > > > > > >> > > > > > > > > segments > > > > > >> > > > > > > > > > > > from > > > > > >> > > > > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > to any > > > > > >> > > > > > > > > > > > > > > > > > > > > sink > > > > > >> > > > > > > > > > > > > > > > > > > > > > I would like, and then > > > > > >> > > > > > > > > > > > > > > > > > > > > > let > > > > Kafka > > > > > >> > clients > > > > > >> > > > > > > > > seamlessly > > > > > >> > > > > > > > > > > > access > > > > > >> > > > > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > > > > cold > > > > > >> > > > > > > > > > > > > > > > > > > > > data. > > > > > >> > > > > > > > > > > > > > > > > > > > > > Today I can do that in > > > > > >> > > > > > > > > > > > > > > > > > > > > > the > > > > > >> client, > > > > > >> > but > > > > > >> > > > > > ideally > > > > > >> > > > > > > > > the > > > > > >> > > > > > > > > > > > broker > > > > > >> > > > > > > > > > > > > > > would > > > > > >> > > > > > > > > > > > > > > > > do it for > > > > > >> > > > > > > > > > > > > > > > > > > > > > me via some HDFS/Hive/S3 > > > > plugin. > > > > > >> > The KIP > > > > > >> > > > > > seems to > > > > > >> > > > > > > > > > > > > > accomplish > > > > > >> > > > > > > > > > > > > > > > > that -- just > > > > > >> > > > > > > > > > > > > > > > > > > > > > without leveraging > > > anything > > > > I've > > > > > >> > > > > currently > > > > > >> > > > > > got in > > > > > >> > > > > > > > > > > > place. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at > > > 3:34 > > > > PM > > > > > >> > Harsha < > > > > > >> > > > > > > > > > > ka...@harsha.io > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Hi Eric, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Thanks for your > > > questions. > > > > > >> > Answers are > > > > > >> > > > > > in-line > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > "The high-level design > > > > seems > > > > > >> to > > > > > >> > > > > indicate > > > > > >> > > > > > that > > > > > >> > > > > > > > > all > > > > > >> > > > > > > > > > > of > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > logic > > > > > >> > > > > > > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > > > > > > > when and > > > > > >> > > > > > > > > > > > > > > > > > > > > > > how to copy log > > > > > >> > > > > > > > > > > > > > > > > > > > > > > segments > > > > to > > > > > >> > remote > > > > > >> > > > > > storage > > > > > >> > > > > > > > > lives in > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > RLM > > > > > >> > > > > > > > > > > > > > > > > class. The > > > > > >> > > > > > > > > > > > > > > > > > > > > > > default implementation > > > is > > > > then > > > > > >> > HDFS > > > > > >> > > > > > specific > > > > > >> > > > > > > > > with > > > > > >> > > > > > > > > > > > > > > additional > > > > > >> > > > > > > > > > > > > > > > > > > > > > > implementations being > > > > left to > > > > > >> the > > > > > >> > > > > > community. > > > > > >> > > > > > > > > This > > > > > >> > > > > > > > > > > > seems > > > > > >> > > > > > > > > > > > > > > like > > > > > >> > > > > > > > > > > > > > > > > it would > > > > > >> > > > > > > > > > > > > > > > > > > > > > > require anyone > > > > implementing a > > > > > >> > new RLM > > > > > >> > > > > to > > > > > >> > > > > > also > > > > > >> > > > > > > > > > > > > > re-implement > > > > > >> > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > logic > > > > > >> > > > > > > > > > > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > > > > > > > > > when to ship data to > > > > remote > > > > > >> > storage." > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > RLM will be responsible > > > > for > > > > > >> > shipping > > > > > >> > > > > log > > > > > >> > > > > > > > > segments > > > > > >> > > > > > > > > > > > and it > > > > > >> > > > > > > > > > > > > > > will > > > > > >> > > > > > > > > > > > > > > > > decide > > > > > >> > > > > > > > > > > > > > > > > > > > > when > > > > > >> > > > > > > > > > > > > > > > > > > > > > > a log segment is ready > > > to > > > > be > > > > > >> > shipped > > > > > >> > > > > > over. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Once a Log Segement(s) > > > are > > > > > >> > identified > > > > > >> > > > > as > > > > > >> > > > > > rolled > > > > > >> > > > > > > > > > > > over, RLM > > > > > >> > > > > > > > > > > > > > > > will > > > > > >> > > > > > > > > > > > > > > > > delegate > > > > > >> > > > > > > > > > > > > > > > > > > > > > > this responsibility to > > > > > >> > > > > > > > > > > > > > > > > > > > > > > a > > > > > >> > pluggable > > > > > >> > > > > remote > > > > > >> > > > > > > > > storage > > > > > >> > > > > > > > > > > > > > > > > implementation. > > > > > >> > > > > > > > > > > > > > > > > > > > > Users who > > > > > >> > > > > > > > > > > > > > > > > > > > > > > are looking add their > > > own > > > > > >> > > > > implementation > > > > > >> > > > > > to > > > > > >> > > > > > > > > enable > > > > > >> > > > > > > > > > > > other > > > > > >> > > > > > > > > > > > > > > > > storages all > > > > > >> > > > > > > > > > > > > > > > > > > > > they > > > > > >> > > > > > > > > > > > > > > > > > > > > > > need to do is to > > > > implement the > > > > > >> > copy and > > > > > >> > > > > > read > > > > > >> > > > > > > > > > > > mechanisms > > > > > >> > > > > > > > > > > > > > and > > > > > >> > > > > > > > > > > > > > > > > not to > > > > > >> > > > > > > > > > > > > > > > > > > > > > > re-implement RLM > > > > > >> > > > > > > > > > > > > > > > > > > > > > > itself. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > "Would it not be better > > > > for > > > > > >> the > > > > > >> > Remote > > > > > >> > > > > > Log > > > > > >> > > > > > > > > Manager > > > > > >> > > > > > > > > > > > > > > > > implementation to be > > > > > >> > > > > > > > > > > > > > > > > > > > > > > non-configurable, and > > > > instead > > > > > >> > have an > > > > > >> > > > > > > > > interface for > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > remote > > > > > >> > > > > > > > > > > > > > > > > storage > > > > > >> > > > > > > > > > > > > > > > > > > > > > > layer? That way the > > > > "when" of > > > > > >> > the logic > > > > > >> > > > > > is > > > > > >> > > > > > > > > > > consistent > > > > > >> > > > > > > > > > > > > > > across > > > > > >> > > > > > > > > > > > > > > > > all > > > > > >> > > > > > > > > > > > > > > > > > > > > > > implementations and > > > > > >> > > > > > > > > > > > > > > > > > > > > > > it's > > > > only > > > > > >> a > > > > > >> > matter > > > > > >> > > > > of > > > > > >> > > > > > > > > "how," > > > > > >> > > > > > > > > > > > similar > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > > how the > > > > > >> > > > > > > > > > > > > > > > > > > > > Streams > > > > > >> > > > > > > > > > > > > > > > > > > > > > > StateStores are > > > managed." > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > It's possible that we > > > can > > > > RLM > > > > > >> > > > > > > > > non-configurable. But > > > > > >> > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > initial > > > > > >> > > > > > > > > > > > > > > > > > > > > > > release and to keep the > > > > > >> backward > > > > > >> > > > > > compatibility > > > > > >> > > > > > > > > > > > > > > > > > > > > > > we want to make this > > > > > >> > configurable and > > > > > >> > > > > > for any > > > > > >> > > > > > > > > users > > > > > >> > > > > > > > > > > > who > > > > > >> > > > > > > > > > > > > > > might > > > > > >> > > > > > > > > > > > > > > > > not be > > > > > >> > > > > > > > > > > > > > > > > > > > > > > interested in having > > > > > >> > > > > > > > > > > > > > > > > > > > > > > the > > > > > >> > LogSegments > > > > > >> > > > > > shipped to > > > > > >> > > > > > > > > > > > remote, > > > > > >> > > > > > > > > > > > > > > they > > > > > >> > > > > > > > > > > > > > > > > don't > > > > > >> > > > > > > > > > > > > > > > > > > > > need to > > > > > >> > > > > > > > > > > > > > > > > > > > > > > worry about this. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Thanks for your > > > questions. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > "How could this be used > > > to > > > > > >> > leverage > > > > > >> > > > > fast > > > > > >> > > > > > > > > key-value > > > > > >> > > > > > > > > > > > > > stores, > > > > > >> > > > > > > > > > > > > > > > e.g. > > > > > >> > > > > > > > > > > > > > > > > > > > > Couchbase, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > which can serve > > > individual > > > > > >> > records but > > > > > >> > > > > > maybe > > > > > >> > > > > > > > > not > > > > > >> > > > > > > > > > > > entire > > > > > >> > > > > > > > > > > > > > > > > segments? Or > > > > > >> > > > > > > > > > > > > > > > > > > > > is the > > > > > >> > > > > > > > > > > > > > > > > > > > > > > idea to only support > > > > writing > > > > > >> and > > > > > >> > > > > fetching > > > > > >> > > > > > > > > entire > > > > > >> > > > > > > > > > > > > > segments? > > > > > >> > > > > > > > > > > > > > > > > Would it > > > > > >> > > > > > > > > > > > > > > > > > > > > make > > > > > >> > > > > > > > > > > > > > > > > > > > > > > sense to support both?" > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > LogSegment once its > > > rolled > > > > > >> over > > > > > >> > are > > > > > >> > > > > > immutable > > > > > >> > > > > > > > > > > > objects and > > > > > >> > > > > > > > > > > > > > > we > > > > > >> > > > > > > > > > > > > > > > > want to > > > > > >> > > > > > > > > > > > > > > > > > > > > keep > > > > > >> > > > > > > > > > > > > > > > > > > > > > > the current structure > > > > > >> > > > > > > > > > > > > > > > > > > > > > > of > > > > > >> > LogSegments > > > > > >> > > > > and > > > > > >> > > > > > > > > > > > corresponding > > > > > >> > > > > > > > > > > > > > > Index > > > > > >> > > > > > > > > > > > > > > > > files. It > > > > > >> > > > > > > > > > > > > > > > > > > > > will > > > > > >> > > > > > > > > > > > > > > > > > > > > > > be easy to copy the > > > whole > > > > > >> > segment as it > > > > > >> > > > > > is, > > > > > >> > > > > > > > > instead > > > > > >> > > > > > > > > > > > of > > > > > >> > > > > > > > > > > > > > > > > re-reading each > > > > > >> > > > > > > > > > > > > > > > > > > > > file > > > > > >> > > > > > > > > > > > > > > > > > > > > > > and use a key/value > > > store. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > " > > > > > >> > > > > > > > > > > > > > > > > > > > > > > - Instead of defining a > > > > new > > > > > >> > interface > > > > > >> > > > > > and/or > > > > > >> > > > > > > > > > > > mechanism to > > > > > >> > > > > > > > > > > > > > > ETL > > > > > >> > > > > > > > > > > > > > > > > segment > > > > > >> > > > > > > > > > > > > > > > > > > > > files > > > > > >> > > > > > > > > > > > > > > > > > > > > > > from brokers to cold > > > > storage, > > > > > >> > can we > > > > > >> > > > > just > > > > > >> > > > > > > > > leverage > > > > > >> > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > itself? In > > > > > >> > > > > > > > > > > > > > > > > > > > > > > particular, we can > > > > already ETL > > > > > >> > records > > > > > >> > > > > > to HDFS > > > > > >> > > > > > > > > via > > > > > >> > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > Connect, > > > > > >> > > > > > > > > > > > > > > > > > > > > Gobblin > > > > > >> > > > > > > > > > > > > > > > > > > > > > > etc -- we really just > > > > need a > > > > > >> way > > > > > >> > for > > > > > >> > > > > > brokers to > > > > > >> > > > > > > > > > > read > > > > > >> > > > > > > > > > > > > > these > > > > > >> > > > > > > > > > > > > > > > > records > > > > > >> > > > > > > > > > > > > > > > > > > > > back. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > I'm wondering whether > > > the > > > > new > > > > > >> > API could > > > > > >> > > > > > be > > > > > >> > > > > > > > > limited > > > > > >> > > > > > > > > > > > to the > > > > > >> > > > > > > > > > > > > > > > > fetch, and > > > > > >> > > > > > > > > > > > > > > > > > > > > then > > > > > >> > > > > > > > > > > > > > > > > > > > > > > existing ETL pipelines > > > > could > > > > > >> be > > > > > >> > more > > > > > >> > > > > > easily > > > > > >> > > > > > > > > > > > leveraged. > > > > > >> > > > > > > > > > > > > > For > > > > > >> > > > > > > > > > > > > > > > > example, if > > > > > >> > > > > > > > > > > > > > > > > > > > > you > > > > > >> > > > > > > > > > > > > > > > > > > > > > > already have an ETL > > > > pipeline > > > > > >> > from Kafka > > > > > >> > > > > > to > > > > > >> > > > > > > > > HDFS, > > > > > >> > > > > > > > > > > you > > > > > >> > > > > > > > > > > > > > could > > > > > >> > > > > > > > > > > > > > > > > leave that > > > > > >> > > > > > > > > > > > > > > > > > > > > in > > > > > >> > > > > > > > > > > > > > > > > > > > > > > place and just tell > > > Kafka > > > > how > > > > > >> to > > > > > >> > read > > > > > >> > > > > > these > > > > > >> > > > > > > > > > > > > > > records/segments > > > > > >> > > > > > > > > > > > > > > > > from cold > > > > > >> > > > > > > > > > > > > > > > > > > > > > > storage when > > > > > >> > > > > > > > > > > > > > > > > > > > > > > necessary." > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > This is pretty much > > > > > >> > > > > > > > > > > > > > > > > > > > > > > what > > > > > >> > everyone does > > > > > >> > > > > > and it > > > > > >> > > > > > > > > has > > > > > >> > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > additional > > > > > >> > > > > > > > > > > > > > > > > > > > > overhead > > > > > >> > > > > > > > > > > > > > > > > > > > > > > of keeping these > > > pipelines > > > > > >> > operating > > > > > >> > > > > and > > > > > >> > > > > > > > > > > monitoring. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > What's proposed in the > > > > KIP is > > > > > >> > not ETL. > > > > > >> > > > > > It's > > > > > >> > > > > > > > > just > > > > > >> > > > > > > > > > > > looking > > > > > >> > > > > > > > > > > > > > a > > > > > >> > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > logs > > > > > >> > > > > > > > > > > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > > > > > > > > > > are written and rolled > > > > over to > > > > > >> > copy the > > > > > >> > > > > > file > > > > > >> > > > > > > > > as it > > > > > >> > > > > > > > > > > > is. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Each new topic needs to > > > be > > > > > >> added > > > > > >> > (sure > > > > > >> > > > > > we can > > > > > >> > > > > > > > > do so > > > > > >> > > > > > > > > > > > via > > > > > >> > > > > > > > > > > > > > > > > wildcard or > > > > > >> > > > > > > > > > > > > > > > > > > > > > > another mechanism) but > > > new > > > > > >> > topics need > > > > > >> > > > > > to be > > > > > >> > > > > > > > > > > onboard > > > > > >> > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > ship > > > > > >> > > > > > > > > > > > > > > > > the data > > > > > >> > > > > > > > > > > > > > > > > > > > > into > > > > > >> > > > > > > > > > > > > > > > > > > > > > > remote storage through > > > > > >> > > > > > > > > > > > > > > > > > > > > > > a > > > > > >> > traditional > > > > > >> > > > > ETL > > > > > >> > > > > > > > > pipeline. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Once the data lands > > > > somewhere > > > > > >> > like > > > > > >> > > > > > HDFS/HIVE > > > > > >> > > > > > > > > etc.. > > > > > >> > > > > > > > > > > > Users > > > > > >> > > > > > > > > > > > > > > need > > > > > >> > > > > > > > > > > > > > > > > to write > > > > > >> > > > > > > > > > > > > > > > > > > > > > > another processing line > > > to > > > > > >> > re-process > > > > > >> > > > > > this data > > > > > >> > > > > > > > > > > > similar > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > how > > > > > >> > > > > > > > > > > > > > > > > they are > > > > > >> > > > > > > > > > > > > > > > > > > > > > > doing it in their > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Stream > > > > > >> > processing > > > > > >> > > > > > pipelines. > > > > > >> > > > > > > > > > > Tiered > > > > > >> > > > > > > > > > > > > > > storage > > > > > >> > > > > > > > > > > > > > > > > is to get > > > > > >> > > > > > > > > > > > > > > > > > > > > > > away from this and make > > > > this > > > > > >> > > > > transparent > > > > > >> > > > > > to the > > > > > >> > > > > > > > > > > user. > > > > > >> > > > > > > > > > > > > > They > > > > > >> > > > > > > > > > > > > > > > > don't need > > > > > >> > > > > > > > > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > > > > > > > > run another ETL process > > > to > > > > > >> ship > > > > > >> > the > > > > > >> > > > > logs. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > "I'm wondering if we > > > could > > > > > >> just > > > > > >> > add > > > > > >> > > > > > support for > > > > > >> > > > > > > > > > > > loading > > > > > >> > > > > > > > > > > > > > > > > segments from > > > > > >> > > > > > > > > > > > > > > > > > > > > > > remote URIs instead of > > > > from > > > > > >> > file, i.e. > > > > > >> > > > > > via > > > > > >> > > > > > > > > plugins > > > > > >> > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > s3://, > > > > > >> > > > > > > > > > > > > > > > > hdfs:// > > > > > >> > > > > > > > > > > > > > > > > > > > > etc. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > I suspect less broker > > > > logic > > > > > >> would > > > > > >> > > > > change > > > > > >> > > > > > in > > > > > >> > > > > > > > > that > > > > > >> > > > > > > > > > > > case -- > > > > > >> > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > broker > > > > > >> > > > > > > > > > > > > > > > > > > > > > > wouldn't necessarily > > > care > > > > if > > > > > >> it > > > > > >> > reads > > > > > >> > > > > > from > > > > > >> > > > > > > > > file:// > > > > > >> > > > > > > > > > > or > > > > > >> > > > > > > > > > > > > > s3:// > > > > > >> > > > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > > load a > > > > > >> > > > > > > > > > > > > > > > > > > > > given > > > > > >> > > > > > > > > > > > > > > > > > > > > > > segment." > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Yes, this is what we > > > > > >> > > > > > > > > > > > > > > > > > > > > > > are > > > > > >> > discussing in > > > > > >> > > > > > KIP. We > > > > > >> > > > > > > > > are > > > > > >> > > > > > > > > > > > > > leaving > > > > > >> > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > details > > > > > >> > > > > > > > > > > > > > > > > > > > > of > > > > > >> > > > > > > > > > > > > > > > > > > > > > > loading segments to RLM > > > > read > > > > > >> part > > > > > >> > > > > > instead of > > > > > >> > > > > > > > > > > directly > > > > > >> > > > > > > > > > > > > > > > exposing > > > > > >> > > > > > > > > > > > > > > > > this in > > > > > >> > > > > > > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Broker. This way we can > > > > keep > > > > > >> the > > > > > >> > > > > current > > > > > >> > > > > > Kafka > > > > > >> > > > > > > > > code > > > > > >> > > > > > > > > > > > as it > > > > > >> > > > > > > > > > > > > > > is > > > > > >> > > > > > > > > > > > > > > > > without > > > > > >> > > > > > > > > > > > > > > > > > > > > > > changing the > > > > > >> > > > > > > > > > > > > > > > > > > > > > > assumptions > > > > > >> around > > > > > >> > the > > > > > >> > > > > local > > > > > >> > > > > > > > > disk. Let > > > > > >> > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > RLM > > > > > >> > > > > > > > > > > > > > > > > handle the > > > > > >> > > > > > > > > > > > > > > > > > > > > > > remote storage part. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Harsha > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019, at > > > > 12:54 > > > > > >> PM, > > > > > >> > > > > Ryanne > > > > > >> > > > > > Dolan > > > > > >> > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > Harsha, Sriharsha, > > > > Suresh, a > > > > > >> > couple > > > > > >> > > > > > thoughts: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > - How could this be > > > > used to > > > > > >> > leverage > > > > > >> > > > > > fast > > > > > >> > > > > > > > > > > key-value > > > > > >> > > > > > > > > > > > > > > stores, > > > > > >> > > > > > > > > > > > > > > > > e.g. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > Couchbase, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > which can serve > > > > individual > > > > > >> > records > > > > > >> > > > > but > > > > > >> > > > > > maybe > > > > > >> > > > > > > > > not > > > > > >> > > > > > > > > > > > entire > > > > > >> > > > > > > > > > > > > > > > > segments? Or > > > > > >> > > > > > > > > > > > > > > > > > > > > is > > > > > >> > > > > > > > > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > idea to only support > > > > writing > > > > > >> > and > > > > > >> > > > > > fetching > > > > > >> > > > > > > > > entire > > > > > >> > > > > > > > > > > > > > > segments? > > > > > >> > > > > > > > > > > > > > > > > Would it > > > > > >> > > > > > > > > > > > > > > > > > > > > make > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > sense to support > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > both? > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > - Instead of defining > > > a > > > > new > > > > > >> > interface > > > > > >> > > > > > and/or > > > > > >> > > > > > > > > > > > mechanism > > > > > >> > > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > > ETL segment > > > > > >> > > > > > > > > > > > > > > > > > > > > > > files > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > from brokers to cold > > > > > >> storage, > > > > > >> > can we > > > > > >> > > > > > just > > > > > >> > > > > > > > > > > leverage > > > > > >> > > > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > itself? In > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > particular, we can > > > > already > > > > > >> ETL > > > > > >> > > > > records > > > > > >> > > > > > to > > > > > >> > > > > > > > > HDFS > > > > > >> > > > > > > > > > > via > > > > > >> > > > > > > > > > > > > > Kafka > > > > > >> > > > > > > > > > > > > > > > > Connect, > > > > > >> > > > > > > > > > > > > > > > > > > > > Gobblin > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > etc -- we really just > > > > need a > > > > > >> > way for > > > > > >> > > > > > brokers > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > > > read > > > > > >> > > > > > > > > > > > > > > these > > > > > >> > > > > > > > > > > > > > > > > records > > > > > >> > > > > > > > > > > > > > > > > > > > > back. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > I'm wondering whether > > > > the > > > > > >> new > > > > > >> > API > > > > > >> > > > > > could be > > > > > >> > > > > > > > > > > limited > > > > > >> > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > fetch, and > > > > > >> > > > > > > > > > > > > > > > > > > > > then > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > existing ETL > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > pipelines > > > > could > > > > > >> > be more > > > > > >> > > > > > easily > > > > > >> > > > > > > > > > > > leveraged. > > > > > >> > > > > > > > > > > > > > > For > > > > > >> > > > > > > > > > > > > > > > > example, > > > > > >> > > > > > > > > > > > > > > > > > > > > if > > > > > >> > > > > > > > > > > > > > > > > > > > > > > you > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > already have an ETL > > > > pipeline > > > > > >> > from > > > > > >> > > > > > Kafka to > > > > > >> > > > > > > > > HDFS, > > > > > >> > > > > > > > > > > > you > > > > > >> > > > > > > > > > > > > > > could > > > > > >> > > > > > > > > > > > > > > > > leave > > > > > >> > > > > > > > > > > > > > > > > > > > > that in > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > place and just tell > > > > Kafka > > > > > >> how > > > > > >> > to read > > > > > >> > > > > > these > > > > > >> > > > > > > > > > > > > > > > records/segments > > > > > >> > > > > > > > > > > > > > > > > from > > > > > >> > > > > > > > > > > > > > > > > > > > > cold > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > storage when > > > necessary. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > - I'm wondering if we > > > > could > > > > > >> > just add > > > > > >> > > > > > support > > > > > >> > > > > > > > > for > > > > > >> > > > > > > > > > > > > > loading > > > > > >> > > > > > > > > > > > > > > > > segments > > > > > >> > > > > > > > > > > > > > > > > > > > > from > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > remote URIs instead > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > of > > > > from > > > > > >> > file, > > > > > >> > > > > i.e. > > > > > >> > > > > > via > > > > > >> > > > > > > > > > > plugins > > > > > >> > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > > > s3://, hdfs:// > > > > > >> > > > > > > > > > > > > > > > > > > > > > > etc. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > I suspect less broker > > > > logic > > > > > >> > would > > > > > >> > > > > > change in > > > > > >> > > > > > > > > that > > > > > >> > > > > > > > > > > > case > > > > > >> > > > > > > > > > > > > > -- > > > > > >> > > > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > > > broker > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > wouldn't necessarily > > > > care if > > > > > >> > it reads > > > > > >> > > > > > from > > > > > >> > > > > > > > > > > file:// > > > > > >> > > > > > > > > > > > or > > > > > >> > > > > > > > > > > > > > > s3:// > > > > > >> > > > > > > > > > > > > > > > > to load a > > > > > >> > > > > > > > > > > > > > > > > > > > > > > given > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > segment. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > Combining the > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > previous > > > > two > > > > > >> > comments, > > > > > >> > > > > I > > > > > >> > > > > > can > > > > > >> > > > > > > > > > > imagine > > > > > >> > > > > > > > > > > > a > > > > > >> > > > > > > > > > > > > > URI > > > > > >> > > > > > > > > > > > > > > > > resolution > > > > > >> > > > > > > > > > > > > > > > > > > > > chain > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > for segments. For > > > > example, > > > > > >> > first try > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> file:///logs/%7Btopic%7D/%7Bsegment%7D.log, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > then > > > > > >> > > > > > > > > s3://mybucket/{topic}/{date}/{segment}.log, > > > > > >> > > > > > > > > > > > etc, > > > > > >> > > > > > > > > > > > > > > > > leveraging your > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > existing ETL > > > > pipeline(s). > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > at > > > > > >> 12:01 PM > > > > > >> > > > > Harsha > > > > > >> > > > > > < > > > > > >> > > > > > > > > > > > > > ka...@harsha.io> > > > > > >> > > > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > Hi All, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > We are interested > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > in > > > > > >> adding > > > > > >> > tiered > > > > > >> > > > > > storage > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > > > Kafka. > > > > > >> > > > > > > > > > > > > > > More > > > > > >> > > > > > > > > > > > > > > > > > > > > > > details > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > about motivation > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > and > > > > > >> design > > > > > >> > are in > > > > > >> > > > > > the > > > > > >> > > > > > > > > KIP. We > > > > > >> > > > > > > > > > > > are > > > > > >> > > > > > > > > > > > > > > > working > > > > > >> > > > > > > > > > > > > > > > > towards > > > > > >> > > > > > > > > > > > > > > > > > > > > an > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > initial POC. Any > > > > feedback > > > > > >> or > > > > > >> > > > > > questions on > > > > > >> > > > > > > > > this > > > > > >> > > > > > > > > > > > KIP > > > > > >> > > > > > > > > > > > > > are > > > > > >> > > > > > > > > > > > > > > > > welcome. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > Harsha > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >