Hi Renato,
Thanks for your feedback, they are very helpful! Please see my comments
inline ...

- About the consistency.
1. I guess when you are talking about consistency you mean versioning and
thus making all tasks use a specific version (most of the time the last
visible one), right? So there is no notion of database consistency but
versioning?
[wei] Versioning is more concerned by file based datasets, which I plan to
address
in a separate SEP. For unbounded datasets, e.g. databases are sources, the
consistency
can be guaranteed by CDC system (ordering + at least once) and then Samza.
Please
let me know if the text in SEP needed to be revised to be more clear.

2. I agree with Jagadish in the point of doing a GC process for keeping the
last visible version for all tasks. Because even for the case you mention
of having bounded input streams, compacting/GC previous version to keep
only the latest one, doesn't mean that it is wrong, it only means that our
local store only has the latest one. Let's say that our final version is T,
and we have tasks that are still reading T-5, then we would compact until
T-5, but we have much less data to keep.
[wei] Yes, this makes sense. I purposely avoided more details in this SEP,
as
these will be address when we solve the bounded dataset case. I will keep
this
in mind, but thank much for bringing this up, this is an important part of
the design.

- About recovery/failure scenarios
1. What would happen if the key-value store fails? Do we make the tasks
wait until the key-value stores come back? or we mark messages that arrived
while the key-value failed as messages that need to be reprocessed?
[wei] This is an excellent point! I had some thoughts around priority
between
main stream and the adjunct data steam, it can quickly get complicated. My
gut
feel is that the behavior is likely application dependent, either solution
could be
viable. There are also alternatives such as query directly an external
store, in case
local store becomes unavailable. However, the current design seeks for a
more
 simplistic approach, where main and adjunct data streams stay relatively
independent
after bootstrap, as we see more use cases we'd make a decision on
directions for
next step. I think a complete solution should probably be described in a
separate
design.

2. What if a task gets restarted that reads from such key-value store?
Should it just read from the last available version on the key-value store?
or should it fetch the version where it stopped working and start from
there?
[wei] For database based source, we would continue from last checkpoint,
which is
 just before it failed. For file based, if the underlying connector can
provide such
capability, the behavior would be the same. Otherwise, we may have to
rebuild
the version. Again, since file based source isn't the focus, there is not
enough details
here, apologies!

- About querying the key-value stores.
1. If we have multiple key-value stores from multiple containers and they
are not centralized ones. Do you have any plans on how to handle multiple
versions from them? Zookeeper?
[wei] I am not entirely clear about the scenario, could you elaborate more:
Did you
mean multiple stores seeded from same source or different?

Best,

Renato M.

On Sun, May 21, 2017 at 5:31 AM, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Hi Wei,
>
> Thanks for the proposal, it looks good!
> I have some clarification questions though, hope you don't mind :)
>
> - About the consistency.
> 1. I guess when you are talking about consistency you mean versioning and
> thus making all tasks use a specific version (most of the time the last
> visible one), right? So there is no notion of database consistency but
> versioning?
> 2. I agree with Jagadish in the point of doing a GC process for keeping the
> last visible version for all tasks. Because even for the case you mention
> of having bounded input streams, compacting/GC previous version to keep
> only the latest one, doesn't mean that it is wrong, it only means that our
> local store only has the latest one. Let's say that our final version is T,
> and we have tasks that are still reading T-5, then we would compact until
> T-5, but we have much less data to keep.
>
> - About recovery/failure scenarios
> 1. What would happen if the key-value store fails? Do we make the tasks
> wait until the key-value stores come back? or we mark messages that arrived
> while the key-value failed as messages that need to be reprocessed?
> 2. What if a task gets restarted that reads from such key-value store?
> Should it just read from the last available version on the key-value store?
> or should it fetch the version where it stopped working and start from
> there?
>
> - About querying the key-value stores.
> 1. If we have multiple key-value stores from multiple containers and they
> are not centralized ones. Do you have any plans on how to handle multiple
> versions from them? Zookeeper?
>
>
> Best,
>
> Renato M.
>
>
> 2017-05-20 8:01 GMT+02:00 Wei Song <weison...@gmail.com>:
>
> > Thanks much for your comment, Jahadish!
> >
> > Please see inline for my response …
> >
> > -Wei
> >
> >
> > On 5/19/17, 5:47 PM, "Jagadish Venkatraman" <jagadish1...@gmail.com>
> > wrote:
> >
> >     Thanks for writing up this proposal, Wei. This will go a long way in
> >     satisfying a number of Samza use-cases. I'm +1 to this idea.
> >
> >     >> Section on proposed changes: Provide hooks to transform an
> incoming
> >     message to desired types (this is useful to store a subset of the
> > incoming
> >     message).
> >
> >     1. I believe you mean store a "projection" of the incoming message?
> > Might
> >   be clear to re-word it IMHO.
> >   [wei] Good point, I’ve called out projection as purposes.
> >
> >     2. Does Adjunct Data store support change-logging?  If it does not,
> >   wondering if it might be worth calling it out.
> >   [wei] No, since the store is read-only and every bit of information can
> > be recovered
> >   the original stream, it doesn’t require change logs. I’ve added section
> > “Recovery”
> >   to discuss recovery and explicitly call this out.
> >
> >     >> Section on Consistency
> >
> >     3. IMO, adding a discussion on what causes a potential inconsistency,
> > and
> >     how we determine what is a consistent snapshot will probably be
> useful
> > (in
> >   bounded, and unbounded datasets).
> >   [wei] Yes, it make sense to explain in more details. I’ve expanded the
> > section
> >   with more details.
> >
> >     >> Section on Bootstrapping: A bootstrap is forced if the store is
> not
> >     available, or not valid (too old)
> >
> >     4. How do we determine if a store is invalid / old? One way would be
> to
> >   store the recent offsets somewhere, and compare the offsets upon
> startup.
> >   [wei] This is addressed in the added “Recovery” section. My proposal is
> > to provide
> >   timestamp based approach in the initial implementation; if we discover
> > this being
> >   insufficient, we would add offset based solutions. You could find more
> > details
> >   in this new section.
> >
> >     >> We may provide a default serde for POJO.
> >
> >     5. +1 for adding default serdes. Using java serialization is probably
> > the
> >   simplest (mandating that keys and values contain serializable fields)
> >   [wei] A potential issue with is approach is that it requires
> implementing
> >   Serializable, maybe it’s worth looking at other possibilities as well.
> >
> >     6. This will perhaps be clearer as we get to implementing it.
> > Currently,
> >     there are 3 storage managers in the proposal - "TaskStorageManager",
> >     "ContainerStorageManager" and "AdjunctDataStorageManager" (different
> > from
> >     AdjunctDataStoreManager) . Not entirely sure we need all 3. Maybe, we
> > do.
> >   [wei] I’ve added a section “implementation notes” to explain their
> > responsibilities,
> >   please review.
> >
> >     >> After bootstrap, for change capture data, we keep updating its AD
> > store
> >     when new updates arrives
> >
> >     7. If used with a streaming source like Kafka, wouldn't the data
> > storage
> >     size grow unbounded in size? Do we need to handle garbage collection
> of
> >     really stale data? What do you think about adding a section on how GC
> >   works? (both for bounded, and unbounded sources)
> >   [wei] there is a possibility, but remote. The intent for an adjunct
> data
> >   store is for querying, especially change data capture scenarios. The
> > underlying
> >   dataset is bounded, it’s just the number updates could be unbounded.
> >   Adding trimming would break the consistency without an
> >   external backup store. On the other side, user should also be mindful
> to
> >   use the adjunct store in the intended way.
> >
> >     >> Configuration: stores.adstore.manager.factory
> >
> >     8. If the user implements their own AdjunctDataStoreManagerFactory,
> > What is
> >     the lifecycle of  the returned `AdjunctDataStoreManager`?
> >     AFAICS, there is no easy way for an implementor to obtain an instance
> > of a
> >     K-V store inside AdjunctDataStoreManagerFactory interface?
> >     Should the API take in a Map<String, KVStore> stores instead of a
> >   Map<String, StorageEngine> ?
> >   [wei] the reason StorageEngine is used here is samza-core doesn’t have
> a
> > dependency
> >   on samza-kv, which hosts KeyValueStorageEngine. This is actually the
> > reason to
> >   make this customizable, so that user can implement this interface to
> > support
> >   non K/V stores. I know this is probably not the main use case for now.
> > For K/V based
> >   stores, I don’t see a lot of need for customization.
> >
> >
> > Thanks for your review, Jagadish!
> >
> >
> >     Best,
> >     Jagadish
> >
> >
> >
> >     On Fri, May 19, 2017 at 3:59 PM, Wei Song <weison...@gmail.com>
> wrote:
> >
> >     > Thanks Xinyu for your feedback. With regard to your question, when
> a
> > new
> >     > version of a file becomes available, we would already be in the
> > normal
> >     > processing mode, either the connector or external system would need
> > to
> >     > inject an indication to signal the end of the current version and
> > continue
> >     > send the new version. The adjunct data store would recognize the
> >     > indication, and build a new version in background. While the new
> > version is
> >     > being built, we continue process incoming event from main stream
> > using the
> >     > existing version. Once the new version is built, we switch to it
> and
> > old
> >     > version can be discarded if desired. It should be seamless from
> >     > processing's perspective.
> >     >
> >     > On Fri, May 19, 2017 at 2:19 PM, xinyu liu <xinyuliu...@gmail.com>
> > wrote:
> >     >
> >     > > Hi, Wei,
> >     > >
> >     > > +1 on the proposed design. This is going to reduce a lot of
> > heavy-lifting
> >     > > work that's needed done by user code today to bootstrap a data
> > stream
> >     > into
> >     > > local store. The configs look quite straightforward and easy to
> > set up.
> >     > > Overall the design looks great to me.
> >     > >
> >     > > I have one question: in the proposal you mentioned "When Samza is
> > running
> >     > > in 24x7 mode, the stream for a bounded dataset may deliver
> multiple
> >     > > versions.". So after the bootstrap of the initial version is
> done,
> > what
> >     > > will happen when the new version comes? Right now by default
> > Bootstrap
> >     > > stream is set up to be priority INT_MAX, meaning it will preempt
> > other
> >     > > streams to be processed if the bootstrap is going on. Are we
> > expecting
> >     > > pauses when the new version of adjunct data coming? Please let me
> > know
> >     > what
> >     > > will be the plan to handle this scenario.
> >     > >
> >     > > Thanks,
> >     > > Xinyu
> >     > >
> >     > > On Tue, May 16, 2017 at 2:15 PM, Navina Ramesh (Apache) <
> >     > nav...@apache.org
> >     > > >
> >     > > wrote:
> >     > >
> >     > > > Thanks for trying 3 times, Wei. Sorry about the trouble. Not
> > sure where
> >     > > the
> >     > > > problem lies. Looking forward to review your design.
> >     > > >
> >     > > > Navina
> >     > > >
> >     > > > On Tue, May 16, 2017 at 8:56 AM, Wei Song <ws...@linkedin.com>
> > wrote:
> >     > > >
> >     > > > > Hey everyone,
> >     > > > >
> >     > > > > I created a proposal for SAMZA-1278
> >     > > > > <https://issues.apache.org/jira/browse/SAMZA-1278>, Adjunct
> > Data
> >     > Store
> >     > > > > for Unbounded DataSets, which introduces an automatic
> > mechanism to
> >     > > store
> >     > > > > adjunct data for stream tasks.
> >     > > > >
> >     > > > > https://cwiki.apache.org/confluence/display/SAMZA/Adjunct+Da
> >     > > > > ta+Store+for+Unbounded+DataSets
> >     > > > >
> >     > > > > Please review and comments are welcome!
> >     > > > >
> >     > > > > For those who are not actively following the master branch,
> > you may
> >     > > have
> >     > > > > more questions than others. Feel free to ask them here.
> >     > > > >
> >     > > > > P.S. this is the 3rd try, sent this last week, but apparently
> > no one
> >     > at
> >     > > > > Linkedin has received, including samza-dev here just to be
> > sure.
> >     > > > >
> >     > > > > --
> >     > > > > Thanks,
> >     > > > > -Wei
> >     > > > >
> >     > > >
> >     > >
> >     >
> >
> >
> >
> >     --
> >     Jagadish V,
> >     Graduate Student,
> >     Department of Computer Science,
> >     Stanford University
> >
> >
> >
> >
>

Reply via email to