Repository: samza
Updated Branches:
  refs/heads/master a5a69c4df -> 5e68d621a


SAMZA-1513; Doc updates for persistent windows, joins and serdes.

jmakes prateekm nickpan47 for review.

Author: Jagadish <jvenkatra...@linkedin.com>

Reviewers: Jake Maes<jm...@linkedin.com>

Closes #369 from vjagadish1989/doc-updates


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5e68d621
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5e68d621
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5e68d621

Branch: refs/heads/master
Commit: 5e68d621aa841feaf1650be64ac723c12b996df5
Parents: a5a69c4
Author: Jagadish <jvenkatra...@linkedin.com>
Authored: Tue Nov 28 12:30:35 2017 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Nov 28 12:30:35 2017 -0800

----------------------------------------------------------------------
 .../versioned/hello-samza-high-level-code.md    | 87 ++++++++++----------
 docs/startup/preview/index.md                   | 67 ++++++++-------
 2 files changed, 80 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5e68d621/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md 
b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
index 6c0526e..2f6a4a6 100644
--- a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
+++ b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
@@ -108,26 +108,13 @@ 
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092
 systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
 {% endhighlight %}
 
 The above configuration defines 2 systems; one called _wikipedia_ and one 
called _kafka_.
 
 A factory is required for each system, so the 
_systems.system-name.samza.system.factory_ property is required for both 
systems. The other properties are system and use-case specific.
 
-For the _kafka_ system, we set the default replication factor to 1 for all 
streams because this application is intended for a demo deployment which 
utilizes a Kafka cluster with only 1 broker, so a replication factor larger 
than 1 is invalid. The default serde is JSON, which means by default any 
streams consumed or produced to the _kafka_ system will use a _json_ serde, 
which we will define in the next section.
-
-The _wikipedia_ system does not need a serde because the `WikipediaConsumer` 
already produces a usable type.
-
-#### Serdes
-Next, we need to configure the 
[serdes](/learn/documentation/{{site.version}}/container/serialization.html) we 
will use for streams and stores in the application.
-{% highlight bash %}
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
-{% endhighlight %}
-
-The _json_ serde was used for the _kafka_ system above. The _string_ and 
_integer_ serdes will be used later.
+For the _kafka_ system, we set the default replication factor to 1 for all 
streams because this application is intended for a demo deployment which 
utilizes a Kafka cluster with only 1 broker, so a replication factor larger 
than 1 is invalid.
 
 #### Configure Streams
 Samza identifies streams using a unique stream ID. In most cases, the stream 
ID is the same as the actual stream name. However, if a stream has a name that 
doesn't match the pattern `[A-Za-z0-9_-]+`, we need to configure a separate 
_physical.name_ to associate the actual stream name with a legal stream ID. The 
Wikipedia channels we will consume have a '#' character in the names. So for 
each of them we must pick a legal stream ID and then configure the physical 
name to match the channel.
@@ -208,16 +195,15 @@ Next, we will declare the input streams for the Wikipedia 
application.
 #### Inputs
 The Wikipedia application consumes events from three channels. Let's declare 
each of those channels as an input streams via the 
[StreamGraph](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html)
 in the 
[init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-)
 method.
 {% highlight java %}
-MessageStream<WikipediaFeedEvent> wikipediaEvents = 
streamGraph.getInputStream("en-wikipedia", (k, v) -> (WikipediaFeedEvent) v);
-MessageStream<WikipediaFeedEvent> wiktionaryEvents = 
streamGraph.getInputStream("en-wiktionary", (k, v) -> (WikipediaFeedEvent) v);
-MessageStream<WikipediaFeedEvent> wikiNewsEvents = 
streamGraph.getInputStream("en-wikinews", (k, v) -> (WikipediaFeedEvent) v);
+MessageStream<WikipediaFeedEvent> wikipediaEvents = 
streamGraph.getInputStream("en-wikipedia", new NoOpSerde<>());
+MessageStream<WikipediaFeedEvent> wiktionaryEvents = 
streamGraph.getInputStream("en-wiktionary", new NoOpSerde<>());
+MessageStream<WikipediaFeedEvent> wikiNewsEvents = 
streamGraph.getInputStream("en-wikinews", new NoOpSerde<>());
 {% endhighlight %}
 
-The first argument to the 
[getInputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-java.util.function.BiFunction-)
 method is the stream ID. Each ID must match the corresponding stream IDs we 
configured earlier.
-
-The second argument is the *message builder*. It converts the input key and 
message to the appropriate type. In this case, we don't have a key and want to 
sent the events as-is, so we have a very simple builder that just forwards the 
input value.
+The first argument to the 
[getInputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-org.apache.samza.serializers.Serde-)
 method is the stream ID. Each ID must match the corresponding stream IDs we 
configured earlier.
+The second argument is the `Serde` used to deserialize the message. We've set 
this to a `NoOpSerde` since our `wikipedia` system already returns 
`WikipediaFeedEvent`s and there is no need for further deserialization.
 
-Note the streams are all MessageStreams of type WikipediaFeedEvent. 
[MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html)
 is the in-memory representation of a stream in Samza. It uses generics to 
ensure type safety across the streams and operations. We knew the 
WikipediaFeedEvent type by inspecting the WikipediaConsumer above and we made 
it explicit with the cast on the output of the MessageBuilder. If our inputs 
used a serde, we would know the type based on which serde is configured for the 
input streams.
+Note the streams are all MessageStreams of type WikipediaFeedEvent. 
[MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html)
 is the in-memory representation of a stream in Samza. It uses generics to 
ensure type safety across the streams and operations.
 
 #### Merge
 We'd like to use the same processing logic for all three input streams, so we 
will use the 
[mergeAll](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-)
 operator to merge them together. Note: this is not the same as a 
[join](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-)
 because we are not associating events by key. We are simply combining three 
streams into one, like a union.
@@ -279,7 +265,7 @@ Note: the type parameters for 
[FoldLeftFunction](/learn/documentation/{{site.ver
 Finally, we can define our 
[window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Window.html)
 back in the 
[init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-)
 method by chaining the result of the parser:
 {% highlight java %}
 allWikipediaEvents.map(WikipediaParser::parseEvent)
-        .window(Windows.tumblingWindow(Duration.ofSeconds(10), 
WikipediaStats::new, new WikipediaStatsAggregator()));
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), 
WikipediaStats::new, new WikipediaStatsAggregator(), new 
JsonSerdeV2<>(WikipediaStats.class)));
 {% endhighlight %}
 
 This defines an unkeyed [tumbling 
window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Windows.html)
 that spans 10s, which instantiates a new `WikipediaStats` object at the 
beginning of each window and aggregates the stats using 
`WikipediaStatsAggregator`.
@@ -287,20 +273,32 @@ This defines an unkeyed [tumbling 
window](/learn/documentation/{{site.version}}/
 The output of the window is a 
[WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html)
 with a key and value. Since we used an unkeyed tumbling window, the key is 
`Void`. The value is our `WikipediaStats` object.
 
 #### Output
-We want to use a JSON serializer to output the window values to Kafka, so we 
will do one more 
[map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-)
 to format the output.
+We will do a 
[map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-)
 at the end to format our window output. Let's begin by defining a simple 
container class for our formatted output.
 
-First, let's define the method to format the stats as a `Map<String, String>` 
so the _json_ serde can handle it. Paste the following after the aggregator 
class:
 {% highlight java %}
-private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> 
statsWindowPane) {
-  WikipediaStats stats = statsWindowPane.getMessage();
-
-  Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
-  counts.put("edits", stats.edits);
-  counts.put("bytes-added", stats.byteDiff);
-  counts.put("unique-titles", stats.titles.size());
+  static class WikipediaStatsOutput {
+    public int edits;
+    public int bytesAdded;
+    public int uniqueTitles;
+    public Map<String, Integer> counts;
+
+    public WikipediaStatsOutput(int edits, int bytesAdded, int uniqueTitles,
+        Map<String, Integer> counts) {
+      this.edits = edits;
+      this.bytesAdded = bytesAdded;
+      this.uniqueTitles = uniqueTitles;
+      this.counts = counts;
+    }
+  }
+{% endhighlight %}
 
-  return counts;
-}
+Paste the following after the aggregator class:
+{% highlight java %}
+  private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> 
statsWindowPane) {
+    WikipediaStats stats = statsWindowPane.getMessage();
+    return new WikipediaStatsOutput(
+        stats.edits, stats.byteDiff, stats.titles.size(), stats.counts);
+  }
 {% endhighlight %}
 
 Now, we can invoke the method by adding another 
[map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-)
 operation to the chain in 
[init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-).
 The operator chain should now look like this:
@@ -312,15 +310,14 @@ allWikipediaEvents.map(WikipediaParser::parseEvent)
 
 Next we need to get the output stream to which we will send the stats. Insert 
the following line below the creation of the 3 input streams:
 {% highlight java %}
-OutputStream<Void, Map<String, Integer>, Map<String, Integer>>
-        wikipediaStats = streamGraph.getOutputStream("wikipedia-stats", m -> 
null, m -> m);
+  OutputStream<WikipediaStatsOutput> wikipediaStats =
+     graph.getOutputStream("wikipedia-stats", new 
JsonSerdeV2<>(WikipediaStatsOutput.class));
 {% endhighlight %}
 
-The 
[OutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/OutputStream.html)
 is parameterized by 3 types; the key type for the output, the value type for 
the output, and upstream type.
-
-The first parameter of 
[getOutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-java.util.function.Function-java.util.function.Function-)
 is the output stream ID. We will use _wikipedia-stats_ and since it contains 
no special characters, we won't bother configuring a physical name so Samza 
will use the stream ID as the topic name.
+The 
[OutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/OutputStream.html)
 is parameterized by the type of the output.
 
-The second and third parameters are the *key extractor* and the *message 
extractor*, respectively. We have no key, so the *key extractor* simply 
produces null. The *message extractor* simply passes the message because it's 
already the correct type for the _json_ serde. Note: we could have skipped the 
previous 
[map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-)
 operator and invoked our formatter here, but we kept them separate for 
pedagogical purposes.
+The first parameter of 
[getOutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-org.apache.samza.serializers.Serde-)
 is the output stream ID. We will use _wikipedia-stats_ and since it contains 
no special characters, we won't bother configuring a physical name so Samza 
will use the stream ID as the topic name.
+The second parameter is the 
[Serde](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/serializers/Serde.html)
 to serialize the outgoing message. We will set it to 
[JsonSerdeV2](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/serializers/JsonSerdeV2.html)
 to serialize our `WikipediaStatsOutput` as a JSON string.
 
 Finally, we can send our output to the output stream using the 
[sendTo](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-)
 operator:
 {% highlight java %}
@@ -339,15 +336,18 @@ We will do this by keeping a separate count outside the 
window and persisting it
 
 We start by defining the store in the config file:
 {% highlight bash %}
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
 
stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
 stores.wikipedia-stats.key.serde=string
 stores.wikipedia-stats.msg.serde=integer
 {% endhighlight %}
 
-These properties declare a [RocksDB](http://rocksdb.org/) key-value store 
named "wikipedia-stats". The store is replicated to a changelog stream called 
"wikipedia-stats-changelog" on the _kafka_ system for durability. It uses the 
_string_ and _integer_ serdes you defined earlier for keys and values 
respectively.
+These properties declare a [RocksDB](http://rocksdb.org/) key-value store 
named "wikipedia-stats". The store is replicated to a changelog stream called 
"wikipedia-stats-changelog" on the _kafka_ system for durability. It uses the 
_string_ and _integer_ serdes for keys and values respectively.
 
-Next, we add a total count member variable to the `WikipediaStats` class:
+Next, we add a total count member variable to the `WikipediaStats` class, and 
to the `WikipediaStatsOutput` class:
 {% highlight java %}
 int totalEdits = 0;
 {% endhighlight %}
@@ -374,10 +374,7 @@ store.put("count-edits-all-time", editsAllTime);
 stats.totalEdits = editsAllTime;
 {% endhighlight %}
 
-Finally, update the `MyWikipediaApplication#formatOutput` method to include 
the total counter.
-{% highlight java %}
-counts.put("edits-all-time", stats.totalEdits);
-{% endhighlight %}
+Finally, update the `MyWikipediaApplication#formatOutput` method to include 
the total counter in its `WikipediaStatsOutput`.
 
 #### Metrics
 Lastly, let's add a metric to the application which counts the number of 
repeat edits each topic within the window interval.

http://git-wip-us.apache.org/repos/asf/samza/blob/5e68d621/docs/startup/preview/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/preview/index.md b/docs/startup/preview/index.md
index b1ca0ac..ceccc39 100644
--- a/docs/startup/preview/index.md
+++ b/docs/startup/preview/index.md
@@ -127,10 +127,10 @@ Since the 0.13.0 release, Samza provides a new high level 
API that simplifies yo
 
 Check out some examples to see the high-level API in action.
 
-1.  [Pageview AdClick 
Joiner](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java)
 demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to 
analyze which pages get the most ad clicks.
-2.  [Pageview 
Repartitioner](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewFilterApp.java)
 illustrates re-partitioning the incoming stream of PageViews.
-3.  [Pageview 
Sessionizer](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java)
 groups the incoming stream of events into sessions based on user activity.
-4.  [Pageview by 
Region](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java)
 counts the number of views per-region over tumbling time intervals.
+1.  [Pageview AdClick 
Joiner](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java)
 demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to 
analyze which pages get the most ad clicks.
+2.  [Pageview 
Repartitioner](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewFilterApp.java)
 illustrates re-partitioning the incoming stream of PageViews.
+3.  [Pageview 
Sessionizer](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java)
 groups the incoming stream of events into sessions based on user activity.
+4.  [Pageview by 
Region](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java)
 counts the number of views per-region over tumbling time intervals.
 
 
 ## Key Concepts
@@ -147,11 +147,11 @@ For example, here is a StreamApplication that validates 
and decorates page views
 public class BadPageViewFilter implements StreamApplication {
   @Override
 public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = 
graph.getInputStream(“page-views”..);
+    MessageStream<PageView> pageViews = graph.getInputStream(“page-views”, 
new JsonSerdeV2<>(PageView.class));
 
     pageViews.filter(this::isValidPageView)
                       .map(this::addProfileInformation)
-                      
.sendTo(graph.getOutputStream(“decorated-page-views”..))
+                      
.sendTo(graph.getOutputStream(“decorated-page-views”, new 
JsonSerdeV2<>(DecoratedPageView.class)))
  }
 }
 {% endhighlight %}
@@ -169,7 +169,7 @@ There are 3 simple steps to write your stream processing 
applications using the
 ### Step 1: Obtain the input streams:
 You can obtain the MessageStream for your input stream ID (“page-views”) 
using StreamGraph.getInputStream.
     {% highlight java %}
-    MessageStream<PageView> pageViewInput = 
graph.getInputStream(“page-views”, (k,v) -> v);
+    MessageStream<PageView> pageViewInput = 
graph.getInputStream(“page-views”, new JsonSerdeV2<>(PageView.class));
     {% endhighlight %}
 
 The first parameter `page-views` is the logical stream ID. Each stream ID is 
associated with a *physical name* and a *system*. By default, Samza uses the 
stream ID as the physical stream name and accesses the stream on the default 
system which is specified with the property “job.default.system”. However, 
the *physical name* and *system* properties can be overridden in configuration. 
For example, the following configuration defines the stream ID "page-views" as 
an alias for the PageViewEvent topic in a local Kafka cluster.
@@ -182,7 +182,7 @@ systems.kafka.producer.bootstrap.servers=localhost:9092
 streams.page-views.samza.physical.name=PageViewEvent
 {% endhighlight %}
 
-The second parameter `(k,v) -> v` is the MessageBuilder function that is used 
to construct a message from the incoming key and value.
+The second parameter is a serde to de-serialize the incoming message.
 
 ### Step 2: Define your transformation logic:
 You are now ready to define your StreamApplication logic as a series of 
transformations on MessageStreams.
@@ -200,8 +200,7 @@ Finally, you can create an OutputStream using 
StreamGraph.getOutputStream and se
 // Send messages with userId as the key to “decorated-page-views”.
 decoratedPageViews.sendTo(
                           graph.getOutputStream(“decorated-page-views”,
-                                                dpv -> dpv.getUserId(),
-                                                dpv -> dpv));
+                                                new 
JsonSerdeV2<>(DecoratedPageView.class)));
 {% endhighlight %}
 
 The first parameter `decorated-page-views` is a logical stream ID. The 
properties for this stream ID can be overridden just like the stream IDs for 
input streams. For example:
@@ -210,7 +209,7 @@ streams.decorated-page-views.samza.system=kafka
 streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
 {% endhighlight %}
 
-The second and third parameters define extractors to split the upstream data 
type into a separate key and value, respectively.
+The second parameter is a serde to de-serialize the outgoing message.
 
 ## Operators
 The high level API supports common operators like map, flatmap, filter, merge, 
joins, and windowing on streams. Most of these operators accept corresponding 
Functions and these functions are 
[Initable](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html).
@@ -247,12 +246,17 @@ MessageStream<String> shortWords = words.filter(word -> 
word.size() < 3);
 
 ### PartitionBy
 Re-partitions this MessageStream using the key returned by the provided 
keyExtractor and returns the transformed MessageStream. Messages are sent 
through an intermediate stream during repartitioning.
-
 {% highlight java %}
-// Repartition pageView by userId
 MessageStream<PageView> pageViews = ...
-MessageStream<PageView> partitionedPageViews =
-                                        pageViews.partitionBy(pageView -> 
pageView.getUserId())
+// Repartition pageView by userId.
+MessageStream<KV<String, PageView>> partitionedPageViews =
+                                        pageViews.partitionBy(pageView -> 
pageView.getUserId(), // key extractor
+                                        pageView -> pageView, // value 
extractor
+                                        KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageView.class)), // serdes
+                                        "partitioned-page-views") // operator 
ID
+
+The operator ID should be unique for an operator within the application and is 
used to identify the streams and stores created by the operator.
+
 {% endhighlight %}
 
 ### Merge
@@ -275,11 +279,11 @@ Sends all messages from this MessageStream to the 
provided OutputStream. You can
 {% highlight java %}
 // Output a new message with userId as the key and region as the value to the 
“user-region” stream.
 MessageStream<PageView> pageViews = ...
-OutputStream<String, String, PageView> userRegions
+MessageStream<KV<String, PageView>> keyedPageViews = 
pageViews.map(KV.of(pageView.getUserId(), pageView.getRegion()));
+OutputStream<KV<String, String>> userRegions
                            = graph.getOutputStream(“user-region”,
-                                                   pageView -> 
pageView.getUserId(),
-                                                   pageView -> 
pageView.getRegion())
-pageView.sendTo(userRegions);
+                                                   KVSerde.of(new 
StringSerde(), new StringSerde()));
+keyedPageViews.sendTo(userRegions);
 {% endhighlight %}
 
 
@@ -308,7 +312,11 @@ The Join operator joins messages from two MessageStreams 
using the provided pair
 MessageStream<OrderRecord> orders = …
 MessageStream<ShipmentRecord> shipments = …
 
-MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new 
OrderShipmentJoiner(), Duration.ofMinutes(20) )
+MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new 
OrderShipmentJoiner(),
+    new StringSerde(), // serde for the join key
+    new JsonSerdeV2<>(OrderRecord.class), new 
JsonSerdeV2<>(ShipmentRecord.class), // serde for both streams
+    Duration.ofMinutes(20), // join TTL
+    "shipped-order-stream") // operator ID
 
 // Constructs a new FulfilledOrderRecord by extracting the order timestamp 
from the OrderRecord and the shipment timestamp from the ShipmentRecord.
  class OrderShipmentJoiner implements JoinFunction<String, OrderRecord, 
ShipmentRecord, FulfilledOrderRecord> {
@@ -358,16 +366,17 @@ Examples:
 MessageStream<PageView> pageViews = ...
 MessageStream<WindowPane<String, Collection<PageView>>> =
                      pageViews.window(
-                         Windows.keyedTumblingWindow(pageView -> 
pageView.getUserId(),
-                           Duration.ofSeconds(30)))
+                         Windows.keyedTumblingWindow(pageView -> 
pageView.getUserId(), // key extractor
+                           Duration.ofSeconds(30), // window duration
+                           new StringSerde(), new 
JsonSerdeV2<>(PageView.class)));
 
 
 // Compute the maximum value over tumbling windows of 3 seconds.
 MessageStream<Integer> integers = …
-Supplier<Integer> initialValue = () -> Integer.MIN_VALUE
-FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> 
Math.max(msg, oldValue)
+Supplier<Integer> initialValue = () -> Integer.MIN_VALUE;
+FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> 
Math.max(msg, oldValue);
 MessageStream<WindowPane<Void, Integer>> windowedStream =
-         integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), 
initialValue, aggregateFunction))
+         integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), 
initialValue, aggregateFunction, new IntegerSerde()));
 
 {% endhighlight %}
 
@@ -383,7 +392,8 @@ Supplier<Integer> initialValue = () -> 0
 FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> 
oldCount + 1;
 Duration sessionGap = Duration.ofMinutes(3);
 MessageStream<WindowPane<String, Integer> sessionCounts = 
pageViews.window(Windows.keyedSessionWindow(
-    pageView -> pageView.getUserId(), sessionGap, initialValue, 
countAggregator));
+    pageView -> pageView.getUserId(), sessionGap, initialValue, 
countAggregator,
+        new StringSerde(), new IntegerSerde()));
 
 // Compute the maximum value over tumbling windows of 3 seconds.
 MessageStream<Integer> integers = …
@@ -391,12 +401,11 @@ Supplier<Integer> initialValue = () -> Integer.MAX_INT
 
 FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> 
Math.max(msg, oldValue)
 MessageStream<WindowPane<Void, Integer>> windowedStream =
-     integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), 
initialValue, aggregateFunction))
+     integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), 
initialValue, aggregateFunction,
+         new IntegerSerde()))
 
 {% endhighlight %}
 
-### Known Issues
-Currently, both window and join operators buffer messages in-memory. So, 
messages could be lost on failures and re-starts.
 
 ---
 

Reply via email to