Hi Renato, Thanks for your feedback, they are very helpful! Please see my comments inline ...
- About the consistency. 1. I guess when you are talking about consistency you mean versioning and thus making all tasks use a specific version (most of the time the last visible one), right? So there is no notion of database consistency but versioning? [wei] Versioning is more concerned by file based datasets, which I plan to address in a separate SEP. For unbounded datasets, e.g. databases are sources, the consistency can be guaranteed by CDC system (ordering + at least once) and then Samza. Please let me know if the text in SEP needed to be revised to be more clear. 2. I agree with Jagadish in the point of doing a GC process for keeping the last visible version for all tasks. Because even for the case you mention of having bounded input streams, compacting/GC previous version to keep only the latest one, doesn't mean that it is wrong, it only means that our local store only has the latest one. Let's say that our final version is T, and we have tasks that are still reading T-5, then we would compact until T-5, but we have much less data to keep. [wei] Yes, this makes sense. I purposely avoided more details in this SEP, as these will be address when we solve the bounded dataset case. I will keep this in mind, but thank much for bringing this up, this is an important part of the design. - About recovery/failure scenarios 1. What would happen if the key-value store fails? Do we make the tasks wait until the key-value stores come back? or we mark messages that arrived while the key-value failed as messages that need to be reprocessed? [wei] This is an excellent point! I had some thoughts around priority between main stream and the adjunct data steam, it can quickly get complicated. My gut feel is that the behavior is likely application dependent, either solution could be viable. There are also alternatives such as query directly an external store, in case local store becomes unavailable. However, the current design seeks for a more simplistic approach, where main and adjunct data streams stay relatively independent after bootstrap, as we see more use cases we'd make a decision on directions for next step. I think a complete solution should probably be described in a separate design. 2. What if a task gets restarted that reads from such key-value store? Should it just read from the last available version on the key-value store? or should it fetch the version where it stopped working and start from there? [wei] For database based source, we would continue from last checkpoint, which is just before it failed. For file based, if the underlying connector can provide such capability, the behavior would be the same. Otherwise, we may have to rebuild the version. Again, since file based source isn't the focus, there is not enough details here, apologies! - About querying the key-value stores. 1. If we have multiple key-value stores from multiple containers and they are not centralized ones. Do you have any plans on how to handle multiple versions from them? Zookeeper? [wei] I am not entirely clear about the scenario, could you elaborate more: Did you mean multiple stores seeded from same source or different? Best, Renato M. On Sun, May 21, 2017 at 5:31 AM, Renato Marroquín Mogrovejo < renatoj.marroq...@gmail.com> wrote: > Hi Wei, > > Thanks for the proposal, it looks good! > I have some clarification questions though, hope you don't mind :) > > - About the consistency. > 1. I guess when you are talking about consistency you mean versioning and > thus making all tasks use a specific version (most of the time the last > visible one), right? So there is no notion of database consistency but > versioning? > 2. I agree with Jagadish in the point of doing a GC process for keeping the > last visible version for all tasks. Because even for the case you mention > of having bounded input streams, compacting/GC previous version to keep > only the latest one, doesn't mean that it is wrong, it only means that our > local store only has the latest one. Let's say that our final version is T, > and we have tasks that are still reading T-5, then we would compact until > T-5, but we have much less data to keep. > > - About recovery/failure scenarios > 1. What would happen if the key-value store fails? Do we make the tasks > wait until the key-value stores come back? or we mark messages that arrived > while the key-value failed as messages that need to be reprocessed? > 2. What if a task gets restarted that reads from such key-value store? > Should it just read from the last available version on the key-value store? > or should it fetch the version where it stopped working and start from > there? > > - About querying the key-value stores. > 1. If we have multiple key-value stores from multiple containers and they > are not centralized ones. Do you have any plans on how to handle multiple > versions from them? Zookeeper? > > > Best, > > Renato M. > > > 2017-05-20 8:01 GMT+02:00 Wei Song <weison...@gmail.com>: > > > Thanks much for your comment, Jahadish! > > > > Please see inline for my response … > > > > -Wei > > > > > > On 5/19/17, 5:47 PM, "Jagadish Venkatraman" <jagadish1...@gmail.com> > > wrote: > > > > Thanks for writing up this proposal, Wei. This will go a long way in > > satisfying a number of Samza use-cases. I'm +1 to this idea. > > > > >> Section on proposed changes: Provide hooks to transform an > incoming > > message to desired types (this is useful to store a subset of the > > incoming > > message). > > > > 1. I believe you mean store a "projection" of the incoming message? > > Might > > be clear to re-word it IMHO. > > [wei] Good point, I’ve called out projection as purposes. > > > > 2. Does Adjunct Data store support change-logging? If it does not, > > wondering if it might be worth calling it out. > > [wei] No, since the store is read-only and every bit of information can > > be recovered > > the original stream, it doesn’t require change logs. I’ve added section > > “Recovery” > > to discuss recovery and explicitly call this out. > > > > >> Section on Consistency > > > > 3. IMO, adding a discussion on what causes a potential inconsistency, > > and > > how we determine what is a consistent snapshot will probably be > useful > > (in > > bounded, and unbounded datasets). > > [wei] Yes, it make sense to explain in more details. I’ve expanded the > > section > > with more details. > > > > >> Section on Bootstrapping: A bootstrap is forced if the store is > not > > available, or not valid (too old) > > > > 4. How do we determine if a store is invalid / old? One way would be > to > > store the recent offsets somewhere, and compare the offsets upon > startup. > > [wei] This is addressed in the added “Recovery” section. My proposal is > > to provide > > timestamp based approach in the initial implementation; if we discover > > this being > > insufficient, we would add offset based solutions. You could find more > > details > > in this new section. > > > > >> We may provide a default serde for POJO. > > > > 5. +1 for adding default serdes. Using java serialization is probably > > the > > simplest (mandating that keys and values contain serializable fields) > > [wei] A potential issue with is approach is that it requires > implementing > > Serializable, maybe it’s worth looking at other possibilities as well. > > > > 6. This will perhaps be clearer as we get to implementing it. > > Currently, > > there are 3 storage managers in the proposal - "TaskStorageManager", > > "ContainerStorageManager" and "AdjunctDataStorageManager" (different > > from > > AdjunctDataStoreManager) . Not entirely sure we need all 3. Maybe, we > > do. > > [wei] I’ve added a section “implementation notes” to explain their > > responsibilities, > > please review. > > > > >> After bootstrap, for change capture data, we keep updating its AD > > store > > when new updates arrives > > > > 7. If used with a streaming source like Kafka, wouldn't the data > > storage > > size grow unbounded in size? Do we need to handle garbage collection > of > > really stale data? What do you think about adding a section on how GC > > works? (both for bounded, and unbounded sources) > > [wei] there is a possibility, but remote. The intent for an adjunct > data > > store is for querying, especially change data capture scenarios. The > > underlying > > dataset is bounded, it’s just the number updates could be unbounded. > > Adding trimming would break the consistency without an > > external backup store. On the other side, user should also be mindful > to > > use the adjunct store in the intended way. > > > > >> Configuration: stores.adstore.manager.factory > > > > 8. If the user implements their own AdjunctDataStoreManagerFactory, > > What is > > the lifecycle of the returned `AdjunctDataStoreManager`? > > AFAICS, there is no easy way for an implementor to obtain an instance > > of a > > K-V store inside AdjunctDataStoreManagerFactory interface? > > Should the API take in a Map<String, KVStore> stores instead of a > > Map<String, StorageEngine> ? > > [wei] the reason StorageEngine is used here is samza-core doesn’t have > a > > dependency > > on samza-kv, which hosts KeyValueStorageEngine. This is actually the > > reason to > > make this customizable, so that user can implement this interface to > > support > > non K/V stores. I know this is probably not the main use case for now. > > For K/V based > > stores, I don’t see a lot of need for customization. > > > > > > Thanks for your review, Jagadish! > > > > > > Best, > > Jagadish > > > > > > > > On Fri, May 19, 2017 at 3:59 PM, Wei Song <weison...@gmail.com> > wrote: > > > > > Thanks Xinyu for your feedback. With regard to your question, when > a > > new > > > version of a file becomes available, we would already be in the > > normal > > > processing mode, either the connector or external system would need > > to > > > inject an indication to signal the end of the current version and > > continue > > > send the new version. The adjunct data store would recognize the > > > indication, and build a new version in background. While the new > > version is > > > being built, we continue process incoming event from main stream > > using the > > > existing version. Once the new version is built, we switch to it > and > > old > > > version can be discarded if desired. It should be seamless from > > > processing's perspective. > > > > > > On Fri, May 19, 2017 at 2:19 PM, xinyu liu <xinyuliu...@gmail.com> > > wrote: > > > > > > > Hi, Wei, > > > > > > > > +1 on the proposed design. This is going to reduce a lot of > > heavy-lifting > > > > work that's needed done by user code today to bootstrap a data > > stream > > > into > > > > local store. The configs look quite straightforward and easy to > > set up. > > > > Overall the design looks great to me. > > > > > > > > I have one question: in the proposal you mentioned "When Samza is > > running > > > > in 24x7 mode, the stream for a bounded dataset may deliver > multiple > > > > versions.". So after the bootstrap of the initial version is > done, > > what > > > > will happen when the new version comes? Right now by default > > Bootstrap > > > > stream is set up to be priority INT_MAX, meaning it will preempt > > other > > > > streams to be processed if the bootstrap is going on. Are we > > expecting > > > > pauses when the new version of adjunct data coming? Please let me > > know > > > what > > > > will be the plan to handle this scenario. > > > > > > > > Thanks, > > > > Xinyu > > > > > > > > On Tue, May 16, 2017 at 2:15 PM, Navina Ramesh (Apache) < > > > nav...@apache.org > > > > > > > > > wrote: > > > > > > > > > Thanks for trying 3 times, Wei. Sorry about the trouble. Not > > sure where > > > > the > > > > > problem lies. Looking forward to review your design. > > > > > > > > > > Navina > > > > > > > > > > On Tue, May 16, 2017 at 8:56 AM, Wei Song <ws...@linkedin.com> > > wrote: > > > > > > > > > > > Hey everyone, > > > > > > > > > > > > I created a proposal for SAMZA-1278 > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1278>, Adjunct > > Data > > > Store > > > > > > for Unbounded DataSets, which introduces an automatic > > mechanism to > > > > store > > > > > > adjunct data for stream tasks. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/Adjunct+Da > > > > > > ta+Store+for+Unbounded+DataSets > > > > > > > > > > > > Please review and comments are welcome! > > > > > > > > > > > > For those who are not actively following the master branch, > > you may > > > > have > > > > > > more questions than others. Feel free to ask them here. > > > > > > > > > > > > P.S. this is the 3rd try, sent this last week, but apparently > > no one > > > at > > > > > > Linkedin has received, including samza-dev here just to be > > sure. > > > > > > > > > > > > -- > > > > > > Thanks, > > > > > > -Wei > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > > > > > > > >