Thanks for the feedback, comments inline!
On Mon, Sep 9, 2013 at 6:46 PM, Chris Riccomini <[email protected]>wrote: > Hey Jay, > > Here are my notes. > > 1. I don't follow this: > > "However this means we have to recover up to a full window on fail-over. > But this will not be very slow for large windows because of the amount of > reprocessing. For larger windows or for effectively infinite windows it is > better to make the in-process aggregation fault-tolerant rather than try > to recompute it." > > Was this supposed to be, "But this will not be very slow for small > windowsÅ "? > Yeah this has one too many negations fixed. > 2. I'm not sure I understand what the example (user settings - user > profiles) would be used for in the table-table join section. > Tried to improve, take a look and let me know what you think. 3. Recommend reversing the order of the phrasing in the table-stream join > section to, "Example: Join page view data to user region information." For > some reason my brain grasps it better when it's posed as stream->table > join in the example. > Good point, clarified what is being "joined on". > 4. "systems is to simply to periodically" .. one to too many? (ho ho :) > Fixed. 5. "mobile ui" -> "mobile UI" > Fixed. 6. "general feature: in general" .. Eliminate the second "in general". > Fixed. 7. I think this statement needs to be expanded upon: "In cases where we > were querying the external database on each input message to join on > additional data for our output stream we would now instead create an input > stream coming from the remote database that captures the changes to the > database." > > This is kind of key for table-table and table-stream joins, and it's not > something that most people think about, or have. An example would suffice. > Agreed, I moved up the "Databases as input streams" section to right after this example. 8. "isolation issues goes away" -> "isolation issue goes away" > Fixed. > 9. Should have a draw-backs section for the Samza approach. Namely lack of > isolation within the machine (disk usage, iops, etc), and potentially slow > restart time when recovering large state. > Added > 10. "The store can abide by the same delivery and fault-tolerance > guarantees that the Samza task itself does." .. Since it's change log is > modeled as a stream. > Added that clarification. 11. "out-of-the-box and gives" -> "out-of-the-box that gives" > fixed > 12. "let's us" -> "lets us" > fixed > 13. "because is it an aggregation" -> "because it is an aggregation" > fixed > 14. "stores.my-store.changelog=my-stream-name" Was there anything special > about the change log stream? I recall there being some strange-ness with > serdes, but I don't remember exactly what it was. Something like, if no > serde is defined and the msg is a byte array, we don't double serialize? > Not that is relevant to the user, right? > 15. Call out that the LevelDB-Java implementation that we're using is > running with JNI. Just good to be aware, in case people see weird off-heap > issues, or segfaults in their tasks. > Will do. > > Cheers, > Chris > > On 9/9/13 12:58 PM, "Jay Kreps" <[email protected]> wrote: > > >1. Yeah I think maybe the confusion is that we give the examples of > >stateful processing without saying why. I tried to make it a little more > >clear. > >2. Tried to make the transition a little more clear. Samza kind of assumes > >at least passing familiarity with Kafka so to some extent this is > >unavoidable, but I think the problem is that it isn't clear why we are > >talking about kafka (a stream implementation). > >3. Not sure how to clarify the diagram (any suggestions?), but hopefully > >improved the text. > > > >Let me know if you feel this helps: > > > >diff --git > >a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn > >/documentation/0.7.0/container/ > >state-management.md > >index c23c7c3..2f9b740 100644 > >--- a/docs/learn/documentation/0.7.0/container/state-management.md > >+++ b/docs/learn/documentation/0.7.0/container/state-management.md > >@@ -5,25 +5,25 @@ title: State Management > > > > One of the more interesting aspects of Samza is the ability for tasks to > >store data locally and execute rich queries against it. > > > >-Of course simple filtering or single-row transformations can be done > >without any need for collecting state. A simple analogy to SQL may make > >make this more obvious. The select- and where-clauses of a SQL query don't > >usually require state: these can be executed a row at a time on input data > >and maintain state between rows. The rest of SQL, multi-row aggregations > >and joins, require more support to execute correctly in a streaming > >fashion. Samza doesn't provide a high-level language like SQL but it does > >provide lower-level primitives that make streaming aggregation and joins > >and other stateful processing easy to implement. > >+Of course simple filtering or single-row transformations can be done > >without any need for collecting state. A simple analogy to SQL may make > >this more obvious. The select- and where-clauses of a SQL query don't > >usually require state: these can be executed a row at a time on input data > >and maintain state between rows. The rest of SQL, multi-row aggregations > >and joins, require more support to execute correctly in a streaming > >fashion. Samza doesn't provide a high-level language like SQL but it does > >provide lower-level primitives that make streaming aggregation and joins > >and other stateful processing easy to implement. > > > > Let's dive into how this works and why it is useful. > > > > ### Common use cases for stateful processing > > > >-First, let's look at some simplistic examples of stateful stream > >processing that might be seen on a consumer website. > >+First, let's look at some simplistic examples of stateful stream > >processing that might be seen on a consumer website. Later in this > >document > >we'll go through specific details of using Samza's built-in key-value > >storage capabilities to implement each of these applications. > > > > ##### Windowed aggregation > > > > Example: Counting the number of page views for each user per hour > > > >-This kind of windowed processing is common for ranking and relevance, > >"trending topics", as well as simple real-time reporting and monitoring. > >+This kind of windowed processing is common for ranking and relevance, > >"trending topics", as well as simple real-time reporting and monitoring. > >For small windows one can just maintain the aggregate in memory and > >manually commit the task position only at window boundaries. However this > >means we have to recover up to a full window on fail-over. But this will > >not be very slow for large windows because of the amount of reprocessing. > >For larger windows or for effectively infinite windows it is better to > >make > >the in-process aggregation fault-tolerant rather than try to recompute it. > > > > ##### Table-table join > > > > Example: Join a table of user profiles to a table of user\_settings by > >user\_id and emit the joined stream > > > >-This example is somewhat simplistic: one might wonder why you would want > >to join two tables in a stream processing system. However consider a more > >realistic example: real-time data normalization. E-commerce companies need > >to handle product imports, web-crawlers need to update their [database of > >the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need > >to normalize and index social data for search. Each of these processing > >flows are emensely complex and contain many complex processing stages that > >effectively join together and normalize many data sources into a single > >clean feed. > >+This example is somewhat simplistic: one might wonder why you would want > >to join two tables in a stream processing system. However consider a more > >realistic example: real-time data normalization. E-commerce companies need > >to handle product imports, web-crawlers need to update their [database of > >the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social > networks > >need to normalize and index social data for search. Each of these > >processing flows are immensely complex and contain many complex processing > >stages that effectively join together and normalize many data sources into > >a single clean feed. > > > > ##### Table-stream join > > > >@@ -53,7 +53,7 @@ This approach works well enough if the in-memory state > >consists of only a few va > > > > #### Using an external store > > > >-In the absence of built-in support a common pattern for stateful > >processing is to push any state that would be accumulated between rows > >into > >an external database or key-value store. You get something that looks like > >this: > >+In the absence of built-in support a common pattern for stateful > >processing is to push any state that would be accumulated between rows > >into > >an external database or key-value store. The database holds aggregates or > >the dataset being queried to enrich the incoming stream. You get something > >that looks like this: > > > > > > > > > >@@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop > >you from querying a rem > > > > To understand why this is useful let's first understand some of the > >drawbacks of making remote queries in a stream processing job: > > > >-1. **Performance**: The first major drawback of making remote queries is > >that they are slow and expensive. A Kafka stream can deliver hundreds of > >thousands or even millions of messages per second per CPU core because it > >transfers large chunks of data at a time. But a remote database query is a > >more expensive proposition. Though the database may be partitioned and > >scalable this partitioning doesn't match the partitioning of the job into > >tasks so batching becomes much less effective. As a result you would > >expect > >to get a few thousand queries per second per core for remote requests. > >This > >means that adding a processing stage that uses an external database will > >often reduce the throughput by several orders of magnitude. > >+1. **Performance**: The first major drawback of making remote queries is > >that they are slow and expensive. For example, a Kafka stream can deliver > >hundreds of thousands or even millions of messages per second per CPU core > >because it transfers large chunks of data at a time. But a remote database > >query is a more expensive proposition. Though the database may be > >partitioned and scalable this partitioning doesn't match the partitioning > >of the job into tasks so batching becomes much less effective. As a result > >you would expect to get a few thousand queries per second per core for > >remote requests. This means that adding a processing stage that uses an > >external database will often reduce the throughput by several orders of > >magnitude. > > 1. **Isolation**: If your database or service is also running live > >processing, mixing in asynchronous processing can be quite dangerous. A > >scalable stream processing system can run with very high parallelism. If > >such a job comes down (say for a code push) it queues up data for > >processing, when it restarts it will potentially have a large backlog of > >data to process. Since the job may actually have very high parallelism > >this > >can result in huge load spikes, many orders of magnitude higher than > >steady > >state load. If this load is mixed with live queries (i.e. the queries used > >to build web pages or render mobile ui or anything else that has a user > >waiting on the other end) then you may end up causing a denial-of-service > >attack on your live service. > > 1. **Query Capabilities**: Many scalable databases expose very limited > >query interfaces--only supporting simple key-value lookups. Doing the > >equivalent of a "full table scan" or rich traversal may not be practical > >in > >this model. > > 1. **Correctness**: If your task keeps counts or otherwise modifies state > >in a remote store how is this rolled back if the task fails? > >@@ -80,6 +80,8 @@ You can think of this as taking the remote table out of > >the remote database and > > > > Note that now the state is physically on the same machine as the tasks, > >and each task has access only to its local partition. However the > >combination of stateful tasks with the normal partitioning capabilities > >Samza offers makes this a very general feature: in general you just > >repartition on the key by which you want to split your processing and then > >you have full local access to the data within storage in that partition. > > > >+In cases where we were querying the external database on each input > >message to join on additional data for our output stream we would now > >instead create an input stream coming from the remote database that > >captures the changes to the database. > >+ > > Let's look at how this addresses the problems of the remote store: > > > > 1. This fixes the performance issues of remote queries because the data > >is > >now local, what would otherwise be a remote query may now just be a lookup > >against local memory or disk (we ship a [LevelDB]( > >https://code.google.com/p/leveldb)-based store which is described in > >detail > >below). > > > > > > > > > > > >On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil > ><[email protected]>wrote: > > > >> I am probably the dumbest person on this list in terms of technical > >> know-how, but hey,.. if I can understand the doc, then most people would > >> understand easily :) > >> > >> Some comments: > >> > >> (1) What does a typical task state consist of ? An explicit example of > >> "task state" would be helpful. There are couple of good examples in the > >>doc > >> but none of them say "hey, for this use case the task state is .." > >> > >> (2) "The problems of remote stores" -> "Performance": > >> Before this point, there was no reference of Kafka at all in the doc and > >> you suddenly start comparing things with Kafka stream. People w/o any > >>Kafka > >> background would not get that part. > >> > >> (3) "Approaches to managing task state" -> "Using an external store" > >> The figure gave me an impression that tasks' o/p goes to 2 places: o/p > >> stream and external store. However reading further made me realize that > >>we > >> just the task state to the external DB > >> which is different from o/p stream...right ? > >> > >> Trivial things: > >> - "A simple analogy to SQL may make make this more obvious." : Word > >>"make" > >> occurs twice > >> - The hyperlink for "database of the web" is not working > >> > >> Thanks, > >> Tejas > >> > >> > >> On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <[email protected]> wrote: > >> > >> > I took a pass at improving the state management documentation > >>(talking to > >> > people, I don't think anyone understood what we were saying): > >> > > >> > > >> > >> > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta > >>te-management.html > >> > > >> > I would love to get some feedback on this, especially from anyone who > >> > doesn't already know Samza. Does this make any sense? Does it tell you > >> what > >> > you need to know in the right order? > >> > > >> > -Jay > >> > > >> > >
