Repository: samza
Updated Branches:
  refs/heads/master 5ba543faf -> 880d2f0c2


Adding Samza 1.0 API docs

jagadish-v0 prateekm please take a look. Thanks!

Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>

Reviewers: Jagadish<jagad...@apache.org>

Closes #746 from nickpan47/api-docs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/880d2f0c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/880d2f0c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/880d2f0c

Branch: refs/heads/master
Commit: 880d2f0c2dfe1d1005a32e56ecb0b0e76fb80881
Parents: 5ba543f
Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Authored: Fri Oct 19 14:13:04 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Oct 19 14:13:04 2018 -0700

----------------------------------------------------------------------
 .../versioned/api/high-level-api.md             | 396 ++++++++++++++++++-
 .../versioned/api/low-level-api.md              | 308 ++++++++++++++-
 .../documentation/versioned/api/overview.md     | 160 --------
 .../versioned/api/programming-model.md          | 158 ++++++++
 4 files changed, 841 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/880d2f0c/docs/learn/documentation/versioned/api/high-level-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/high-level-api.md 
b/docs/learn/documentation/versioned/api/high-level-api.md
index 2a54215..9c13f24 100644
--- a/docs/learn/documentation/versioned/api/high-level-api.md
+++ b/docs/learn/documentation/versioned/api/high-level-api.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Streams DSL
+title: High-level API
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,6 +19,398 @@ title: Streams DSL
    limitations under the License.
 -->
 
+# Introduction
 
-# High level API section 1
+The high level API provides the libraries to define your application logic. 
The StreamApplication is the central abstraction which your application must 
implement. You start by declaring your inputs as instances of MessageStream. 
Then you can apply operators on each MessageStream like map, filter, window, 
and join to define the whole end-to-end data processing in a single program.
 
+Since the 0.13.0 release, Samza provides a new high level API that simplifies 
your applications. This API supports operations like re-partitioning, 
windowing, and joining on streams. You can now express your application logic 
concisely in few lines of code and accomplish what previously required multiple 
jobs.
+# Code Examples
+
+Check out some examples to see the high-level API in action.
+1. PageView AdClick Joiner demonstrates joining a stream of PageViews with a 
stream of AdClicks, e.g. to analyze which pages get the most ad clicks.
+2. PageView Repartitioner illustrates re-partitioning the incoming stream of 
PageViews.
+3. PageView Sessionizer groups the incoming stream of events into sessions 
based on user activity.
+4. PageView by Region counts the number of views per-region over tumbling time 
intervals.
+
+# Key Concepts
+## StreamApplication
+When writing your stream processing application using the Samza high-level 
API, you implement a StreamApplication and define your processing logic in the 
describe method.
+
+{% highlight java %}
+
+    public void describe(StreamApplicationDescriptor appDesc) { … }
+
+{% endhighlight %}
+
+For example, here is a StreamApplication that validates and decorates page 
views with viewer’s profile information.
+
+{% highlight java %}
+
+    public class BadPageViewFilter implements StreamApplication {
+      @Override
+      public void describe(StreamApplicationDescriptor appDesc) {
+        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
+        InputDescriptor<PageView> pageViewInput = 
kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
+        OutputDescriptor<DecoratedPageView> outputPageViews = 
kafka.getOutputDescriptor( “decorated-page-views”, new 
JsonSerdeV2<>(DecoratedPageView.class));    
+        MessageStream<PageView> pageViews = 
appDesc.getInputStream(pageViewInput);
+        pageViews.filter(this::isValidPageView)
+            .map(this::addProfileInformation)
+            .sendTo(appDesc.getOutputStream(outputPageViews));
+      }
+    }
+    
+{% endhighlight %}
+
+## MessageStream
+A MessageStream, as the name implies, represents a stream of messages. A 
StreamApplication is described as a series of transformations on 
MessageStreams. You can get a MessageStream in two ways:
+1. Using StreamApplicationDescriptor.getInputStream to get the MessageStream 
for a given input stream (e.g., a Kafka topic).
+2. By transforming an existing MessageStream using operations like map, 
filter, window, join etc.
+## Table
+A Table represents a dataset that can be accessed by keys, and is one of the 
building blocks of the Samza high level API; the main motivation behind it is 
to support stream-table joins. The current K/V store is leveraged to provide 
backing store for local tables. More variations such as direct access and 
composite tables will be supported in the future. The usage of a table 
typically follows three steps:
+1. Create a table
+2. Populate the table using the sendTo() operator
+3. Join a stream with the table using the join() operator
+
+{% highlight java %}
+
+    final StreamApplication app = (streamAppDesc) -> {
+      Table<KV<Integer, Profile>> table = streamAppDesc.getTable(new 
InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      ...
+    };
+
+{% endhighlight %}
+
+Example above creates a TableDescriptor object, which contains all information 
about a table. The currently supported table types are InMemoryTableDescriptor 
and RocksDbTableDescriptor. Notice the type of records in a table is KV, and 
Serdes for both key and value of records needs to be defined (line 4). 
Additional parameters can be added based on individual table types.
+
+More details about step 2 and 3 can be found at [operator section](#operators).
+
+# Anatomy of a typical StreamApplication
+There are 3 simple steps to write your stream processing logic using the Samza 
high-level API.
+## Step 1: Obtain the input streams
+You can obtain the MessageStream for your input stream ID (“page-views”) 
using StreamApplicationDescriptor.getInputStream.
+
+{% highlight java %}
+
+    KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka")
+        .withConsumerZkConnect(ImmutableList.of("localhost:2181"))
+        .withProducerBootstrapServers(ImmutableList.of("localhost:9092"));
+
+    KafkaInputDescriptor<KV<String, Integer>> pageViewInput =
+        sd.getInputDescriptor("page-views", KVSerde.of(new StringSerde(), new 
JsonSerdeV2(PageView.class)));
+    
+    MessageStream<PageView> pageViews = 
streamAppDesc.getInputStream(pageViewInput);
+
+{% endhighlight %}
+
+The parameter {% highlight java %}pageViewInput{% endhighlight %} is the 
[InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html).
 Each InputDescriptor includes the full information of an input stream, 
including the stream ID, the serde to deserialize the input messages, and the 
system. By default, Samza uses the stream ID as the physical stream name and 
accesses the stream on the default system which is specified with the property 
“job.default.system”. However, the physical name and system properties can 
be overridden in configuration. For example, the following configuration 
defines the stream ID “page-views” as an alias for the PageViewEvent topic 
in a local Kafka cluster.
+
+{% highlight jproperties %}
+
+    streams.page-views.samza.physical.name=PageViewEvent
+
+{% endhighlight %}
+
+## Step 2: Define your transformation logic
+You are now ready to define your StreamApplication logic as a series of 
transformations on MessageStreams.
+
+{% highlight java %}
+
+    MessageStream<DecoratedPageViews> decoratedPageViews = 
pageViews.filter(this::isValidPageView)
+        .map(this::addProfileInformation);
+
+{% endhighlight %}
+
+## Step 3: Write to output streams
+
+Finally, you can create an OutputStream using 
StreamApplicationDescriptor.getOutputStream and send the transformed messages 
through it.
+
+{% highlight java %}
+
+    KafkaOutputDescriptor<DecoratedPageViews> outputPageViews =
+        sd.getInputDescriptor("page-views", new 
JsonSerdeV2(DecoratedPageViews.class));
+  
+    // Send messages with userId as the key to “decorated-page-views”.
+    decoratedPageViews.sendTo(streamAppDesc.getOutputStream(outputPageViews));
+
+{% endhighlight %}
+
+The parameter {% highlight java %}outputPageViews{% endhighlight %} is the 
[OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html),
 which includes the stream ID, the serde to serialize the outgoing messages, 
the physical name and the system. Similarly, the properties for this stream can 
be overridden just like the stream IDs for input streams. For example:
+
+{% highlight jproperties %}
+
+    streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
+
+{% endhighlight %}
+
+# Operators
+The high level API supports common operators like map, flatmap, filter, merge, 
joins, and windowing on streams. Most of these operators accept corresponding 
Functions, which are Initable and Closable.
+## Map
+Applies the provided 1:1 MapFunction to each element in the MessageStream and 
returns the transformed MessageStream. The MapFunction takes in a single 
message and returns a single message (potentially of a different type).
+
+{% highlight java %}
+    
+    MessageStream<Integer> numbers = ...
+    MessageStream<Integer> tripled = numbers.map(m -> m * 3);
+    MessageStream<String> stringified = numbers.map(m -> String.valueOf(m));
+
+{% endhighlight %}
+## FlatMap
+Applies the provided 1:n FlatMapFunction to each element in the MessageStream 
and returns the transformed MessageStream. The FlatMapFunction takes in a 
single message and returns zero or more messages.
+
+{% highlight java %}
+    
+    MessageStream<String> sentence = ...
+    // Parse the sentence into its individual words splitting by space
+    MessageStream<String> words = sentence.flatMap(sentence ->
+        Arrays.asList(sentence.split(“ ”))
+
+{% endhighlight %}
+## Filter
+Applies the provided FilterFunction to the MessageStream and returns the 
filtered MessageStream. The FilterFunction is a predicate that specifies 
whether a message should be retained in the filtered stream. Messages for which 
the FilterFunction returns false are filtered out.
+
+{% highlight java %}
+    
+    MessageStream<String> words = ...
+    // Extract only the long words
+    MessageStream<String> longWords = words.filter(word -> word.size() > 15);
+    // Extract only the short words
+    MessageStream<String> shortWords = words.filter(word -> word.size() < 3);
+    
+{% endhighlight %}
+## PartitionBy
+Re-partitions this MessageStream using the key returned by the provided 
keyExtractor and returns the transformed MessageStream. Messages are sent 
through an intermediate stream during repartitioning.
+{% highlight java %}
+    
+    MessageStream<PageView> pageViews = ...
+    // Repartition pageView by userId.
+    MessageStream<KV<String, PageView>> partitionedPageViews = 
pageViews.partitionBy(
+        pageView -> pageView.getUserId(), // key extractor
+        pageView -> pageView, // value extractor
+        KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)), // 
serdes
+        "partitioned-page-views"); // operator ID    
+        
+{% endhighlight %}
+
+The operator ID should be unique for an operator within the application and is 
used to identify the streams and stores created by the operator.
+
+## Merge
+Merges the MessageStream with all the provided MessageStreams and returns the 
merged stream.
+{% highlight java %}
+    
+    MessageStream<ServiceCall> serviceCall1 = ...
+    MessageStream<ServiceCall> serviceCall2 = ...
+    // Merge individual “ServiceCall” streams and create a new merged 
MessageStream
+    MessageStream<ServiceCall> serviceCallMerged = 
serviceCall1.merge(serviceCall2);
+    
+{% endhighlight %}
+
+The merge transform preserves the order of each MessageStream, so if message 
{% highlight java %}m1{% endhighlight %} appears before {% highlight java 
%}m2{% endhighlight %} in any provided stream, then, {% highlight java %}m1{% 
endhighlight %} also appears before {% highlight java %}m2{% endhighlight %} in 
the merged stream.
+As an alternative to the merge instance method, you also can use the 
[MessageStream#mergeAll](javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-)
 static method to merge MessageStreams without operating on an initial stream.
+
+## Broadcast
+Broadcasts the MessageStream to all instances of down-stream transformation 
operators via the intermediate stream.
+{% highlight java %}
+
+    MessageStream<VersionChange> verChanges = ...
+    // Broadcast input data version change event to all operator instances.
+    MessageStream<VersionChange> broadcastVersionChanges = 
+        verChanges.broadcast(new JsonSerdeV2<>(VersionChange.class), // serde
+                             "broadcast-version-changes"); // operator ID
+{% endhighlight %}
+
+## SendTo(Stream)
+Sends all messages from this MessageStream to the provided OutputStream. You 
can specify the key and the value to be used for the outgoing message.
+
+{% highlight java %}
+    
+    // Output a new message with userId as the key and region as the value to 
the “user-region” stream.
+    OutputDescriptor<KV<String, String>> outputRegions = 
+        kafka.getOutputDescriptor(“user-region”, KVSerde.of(new 
StringSerde(), new StringSerde());
+    MessageStream<PageView> pageViews = ...
+    MessageStream<KV<String, PageView>> keyedPageViews = 
pageViews.map(KV.of(pageView.getUserId(), pageView.getRegion()));
+    keyedPageViews.sendTo(appDesc.getOutputStream(outputRegions));
+
+{% endhighlight %}
+## SendTo(Table)
+Sends all messages from this MessageStream to the provided table, the expected 
message type is KV.
+
+{% highlight java %}
+    
+    // Write a new message with memberId as the key and profile as the value 
to a table.
+    appDesc.getInputStream(kafka.getInputDescriptor("Profile", new 
NoOpSerde<Profile>()))
+        .map(m -> KV.of(m.getMemberId(), m))
+        .sendTo(table);
+        
+{% endhighlight %}
+
+## Sink
+Allows sending messages from this MessageStream to an output system using the 
provided 
[SinkFunction](javadocs/org/apache/samza/operators/functions/SinkFunction.html).
+
+This offers more control than {% highlight java %}sendTo{% endhighlight %} 
since the SinkFunction has access to the MessageCollector and the 
TaskCoordinator. For instance, you can choose to manually commit offsets, or 
shut-down the job using the TaskCoordinator APIs. This operator can also be 
used to send messages to non-Samza systems (e.g. remote databases, REST 
services, etc.)
+
+{% highlight java %}
+    
+    // Repartition pageView by userId.
+    MessageStream<PageView> pageViews = ...
+    pageViews.sink( (msg, collector, coordinator) -> {
+        // Construct a new outgoing message, and send it to a kafka topic 
named TransformedPageViewEvent.
+        collector.send(new OutgoingMessageEnvelope(new 
SystemStream(“kafka”,
+                       “TransformedPageViewEvent”), msg));
+    } );
+        
+{% endhighlight %}
+
+## Join(Stream-Stream)
+The stream-stream Join operator joins messages from two MessageStreams using 
the provided pairwise 
[JoinFunction](javadocs/org/apache/samza/operators/functions/JoinFunction.html).
 Messages are joined when the keys extracted from messages from the first 
stream match keys extracted from messages in the second stream. Messages in 
each stream are retained for the provided ttl duration and join results are 
emitted as matches are found.
+
+{% highlight java %}
+    
+    // Joins a stream of OrderRecord with a stream of ShipmentRecord by 
orderId with a TTL of 20 minutes.
+    // Results are produced to a new stream of FulfilledOrderRecord.
+    MessageStream<OrderRecord> orders = …
+    MessageStream<ShipmentRecord> shipments = …
+
+    MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, 
new OrderShipmentJoiner(),
+        new StringSerde(), // serde for the join key
+        new JsonSerdeV2<>(OrderRecord.class), new 
JsonSerdeV2<>(ShipmentRecord.class), // serde for both streams
+        Duration.ofMinutes(20), // join TTL
+        "shipped-order-stream") // operator ID
+
+    // Constructs a new FulfilledOrderRecord by extracting the order timestamp 
from the OrderRecord and the shipment timestamp from the ShipmentRecord.
+    class OrderShipmentJoiner implements JoinFunction<String, OrderRecord, 
ShipmentRecord, FulfilledOrderRecord> {
+      @Override
+      public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord 
otherMessage) {
+        return new FulfilledOrderRecord(message.orderId, 
message.orderTimestamp, otherMessage.shipTimestamp);
+      }
+
+      @Override
+      public String getFirstKey(OrderRecord message) {
+        return message.orderId;
+      }
+
+      @Override
+      public String getSecondKey(ShipmentRecord message) {
+        return message.orderId;
+      }
+    }
+    
+{% endhighlight %}
+
+## Join(Stream-Table)
+The stream-table Join operator joins messages from a MessageStream using the 
provided 
[StreamTableJoinFunction](javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html).
 Messages from the input stream are joined with record in table using key 
extracted from input messages. The join function is invoked with both the 
message and the record. If a record is not found in the table, a null value is 
provided; the join function can choose to return null (inner join) or an output 
message (left outer join). For join to function properly, it is important to 
ensure the input stream and table are partitioned using the same key as this 
impacts the physical placement of data.
+
+{% highlight java %}
+   
+    streamAppDesc.getInputStream(kafk.getInputDescriptor("PageView", new 
NoOpSerde<PageView>()))
+        .partitionBy(PageView::getMemberId, v -> v, "p1")
+        .join(table, new PageViewToProfileJoinFunction())
+        ...
+    
+    public class PageViewToProfileJoinFunction implements 
StreamTableJoinFunction
+        <Integer, KV<Integer, PageView>, KV<Integer, Profile>, 
EnrichedPageView> {
+      
+      @Override
+      public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, 
Profile> r) {
+        return r != null ? new EnrichedPageView(...) : null;
+      }
+       
+      @Override
+      public Integer getMessageKey(KV<Integer, PageView> message) {
+        return message.getKey();
+      }
+
+      @Override
+      public Integer getRecordKey(KV<Integer, Profile> record) {
+        return record.getKey();
+      }
+    }
+{% endhighlight %}
+
+## Window
+### Windowing Concepts
+**Windows, Triggers, and WindowPanes**: The window operator groups incoming 
messages in the MessageStream into finite windows. Each emitted result contains 
one or more messages in the window and is called a WindowPane.
+
+A window can have one or more associated triggers which determine when results 
from the window are emitted. Triggers can be either [early 
triggers](javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-)
 that allow emitting results speculatively before all data for the window has 
arrived, or late triggers that allow handling late messages for the window.
+
+**Aggregator Function**: By default, the emitted WindowPane will contain all 
the messages for the window. Instead of retaining all messages, you typically 
define a more compact data structure for the WindowPane and update it 
incrementally as new messages arrive, e.g. for keeping a count of messages in 
the window. To do this, you can provide an aggregating 
[FoldLeftFunction](javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html)
 which is invoked for each incoming message added to the window and defines how 
to update the WindowPane for that message.
+
+**Accumulation Mode**: A window’s accumulation mode determines how results 
emitted from a window relate to previously emitted results for the same window. 
This is particularly useful when the window is configured with early or late 
triggers. The accumulation mode can either be discarding or accumulating.
+
+A discarding window clears all state for the window at every emission. Each 
emission will only correspond to new messages that arrived since the previous 
emission for the window.
+
+An accumulating window retains window results from previous emissions. Each 
emission will contain all messages that arrived since the beginning of the 
window.
+
+### Window Types
+The Samza high-level API currently supports tumbling and session windows.
+
+**Tumbling Window**: A tumbling window defines a series of contiguous, fixed 
size time intervals in the stream.
+
+Examples:
+
+{% highlight java %}
+    
+    // Group the pageView stream into 3 second tumbling windows keyed by the 
userId.
+    MessageStream<PageView> pageViews = ...
+    MessageStream<WindowPane<String, Collection<PageView>>> =
+        pageViews.window(
+            Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), // 
key extractor
+            Duration.ofSeconds(30), // window duration
+            new StringSerde(), new JsonSerdeV2<>(PageView.class)));
+
+    // Compute the maximum value over tumbling windows of 3 seconds.
+    MessageStream<Integer> integers = …
+    Supplier<Integer> initialValue = () -> Integer.MIN_VALUE;
+    FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> 
Math.max(msg, oldValue);
+    MessageStream<WindowPane<Void, Integer>> windowedStream =
+        integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), 
initialValue, aggregateFunction, new IntegerSerde()));
+   
+{% endhighlight %}
+
+**Session Window**: A session window groups a MessageStream into sessions. A 
session captures a period of activity over a MessageStream and is defined by a 
gap. A session is closed and results are emitted if no new messages arrive for 
the window for the gap duration.
+
+Examples:
+
+{% highlight java %}
+
+    // Sessionize a stream of page views, and count the number of page-views 
in a session for every user.
+    MessageStream<PageView> pageViews = …
+    Supplier<Integer> initialValue = () -> 0
+    FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) 
-> oldCount + 1;
+    Duration sessionGap = Duration.ofMinutes(3);
+    MessageStream<WindowPane<String, Integer> sessionCounts = 
pageViews.window(Windows.keyedSessionWindow(
+        pageView -> pageView.getUserId(), sessionGap, initialValue, 
countAggregator,
+            new StringSerde(), new IntegerSerde()));
+
+    // Compute the maximum value over tumbling windows of 3 seconds.
+    MessageStream<Integer> integers = …
+    Supplier<Integer> initialValue = () -> Integer.MAX_INT
+
+    FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> 
Math.max(msg, oldValue)
+    MessageStream<WindowPane<Void, Integer>> windowedStream =
+       integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), 
initialValue, aggregateFunction,
+           new IntegerSerde()));
+         
+{% endhighlight %}
+
+# Operator IDs
+Each operator in your application is associated with a globally unique 
identifier. By default, each operator is assigned an ID based on its usage in 
the application. Some operators that create and use external resources (e.g., 
intermediate streams for partitionBy and broadcast, stores and changelogs for 
joins and windows, etc.) require you to provide an explicit ID for them. It's 
highly recommended to provide meaningful IDs for such operators. These IDs help 
you control the underlying resources when you make changes to the application 
logic that change the position of the operator within the DAG, and
+1. You wish to retain the previous state for the operator, since the changes 
to the DAG don't affect the operator semantics. For example, you added a map 
operator before a partitionBy operator to log the incoming message. In this 
case, you can retain previous the operator ID.
+2. You wish to discard the previous state for the operator, since the changes 
to the DAG change the operator semantics. For example, you added a filter 
operator before a partitionBy operator that discards some of the messages. In 
this case, you should change the operator ID. Note that by doing so you will 
lose any previously checkpointed messages that haven't been completely 
processed by the downstream operators yet.
+
+An operator ID is of the format: **jobName-jobId-opCode-opId**
+- **jobName** is the name of your job, as specified using the configuration 
"job.name"
+- **jobId** is the name of your job, as specified using the configuration 
"job.id"
+- **opCode** is an identifier for the type of the operator, e.g. 
map/filter/join
+- **opId** is either auto-generated by the framework based on the position of 
the operator within the DAG, or can be provided by you for operators that 
manage external resources.
+
+# Application Serialization
+Samza relies on Java Serialization to distribute your application logic to the 
processors. For this to work, all of your custom application logic needs to be 
Serializable. For example, all the Function interfaces implement Serializable, 
and your implementations need to be serializable as well. It's recommended to 
use the Context APIs to set up any non-serializable context that your 
Application needs at Runtime.
+
+# Data Serialization
+Producing and consuming from streams and tables require serializing and 
deserializing data. In addition, some operators like joins and windows store 
data in a local store for durability across restarts. Such operations require 
you to provide a Serde implementation when using them. This also helps Samza 
infer the type of the data in your application, thus allowing the operator 
transforms to be type safe. Samza provides the following Serde implementations 
that you can use out of the box:
+
+- KVSerde: A pair of Serdes, first for the keys, and the second for the values 
in the incoming/outgoing message or a table record.
+- NoOpSerde: A serde implementation that indicates that the framework should 
not attempt any serialization/deserialization of the data. Useful in some cases 
when the SystemProducer/SystemConsumer handle serialization/deserialization 
themselves.
+- JsonSerdeV2: a type-specific Json serde that allows directly deserializing 
the Json bytes into to specific POJO type.
+- Serdes for primitive types: serdes for primitive types, such as ByteBuffer, 
Double, Long, Integer, Byte, String, etc.

http://git-wip-us.apache.org/repos/asf/samza/blob/880d2f0c/docs/learn/documentation/versioned/api/low-level-api.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/low-level-api.md 
b/docs/learn/documentation/versioned/api/low-level-api.md
index c162ca2..e91f74e 100644
--- a/docs/learn/documentation/versioned/api/low-level-api.md
+++ b/docs/learn/documentation/versioned/api/low-level-api.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Low level API
+title: Low level Task API
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -20,33 +20,303 @@ title: Low level API
 -->
 
 
-# Section 1
+# Introduction
+Task APIs (i.e. [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) 
or [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html)) are 
bare-metal interfaces that exposes the system implementation details in Samza. 
When using Task APIs, you will implement your application as a 
[TaskApplication](javadocs/org/apache/samza/application/TaskApplication.html). 
The main difference between a TaskApplication and a StreamApplication is the 
APIs used to describe the processing logic. In TaskApplication, the processing 
logic is defined via StreamTask and AsyncStreamTask.
 
-# Sample Applications
+# Key Concepts
 
+## TaskApplication
+Here is an example of a user implemented TaskApplication:
 
-# Section 2
+{% highlight java %}
+    
+    package com.example.samza;
 
-# Section 3
+    public class BadPageViewFilter implements TaskApplication {
+      @Override
+      public void describe(TaskApplicationDescriptor appDesc) {
+        // Add input, output streams and tables
+        KafkaSystemDescriptor<String, PageViewEvent> kafkaSystem = 
+            new KafkaSystemDescriptor(“kafka”)
+              .withConsumerZkConnect(myZkServers)
+              .withProducerBootstrapServers(myBrokers);
+        KVSerde<String, PageViewEvent> serde = 
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<PageViewEvent>());
+        // Add input, output streams and tables
+        
appDesc.withInputStream(kafkaSystem.getInputDescriptor(“pageViewEvent”, 
serde))
+            
.withOutputStream(kafkaSystem.getOutputDescriptor(“goodPageViewEvent”, 
serde))
+            .withTable(new RocksDBTableDescriptor(
+                “badPageUrlTable”, KVSerde.of(new StringSerde(), new 
IntegerSerde())
+            .withTaskFactory(new BadPageViewTaskFactory());
+      }
+    }
 
+{% endhighlight %}
 
-# Section 4
+In the above example, user defines the input stream, the output stream, and a 
RocksDB table for the application, and then provide the processing logic 
defined in BadPageViewTaskFactory. All descriptors (i.e. input/output streams 
and tables) and the 
[TaskFactory](javadocs/org/apache/samza/task/TaskFactory.html) are registered 
to the 
[TaskApplicationDescriptor](javadocs/org/apache/samza/application/descriptors/TaskApplicationDescriptor.html).
 
-The table below summarizes table metrics:
+## TaskFactory
+You will need to implement a 
[TaskFactory](javadocs/org/apache/samza/task/TaskFactory.html) to create task 
instances to execute user defined processing logic. Correspondingly, 
StreamTaskFactory and AsyncStreamTaskFactory are used to create StreamTask and 
AsyncStreamTask respectively. The 
[StreamTaskFactory](javadocs/org/apache/samza/task/StreamTaskFactory.html) for 
the above example is shown below:
 
+{% highlight java %}
 
-| Metrics | Class | Description |
-|---------|-------|-------------|
-|`get-ns`|`ReadableTable`|Average latency of `get/getAsync()` operations|
-|`getAll-ns`|`ReadableTable`|Average latency of `getAll/getAllAsync()` 
operations|
-|`num-gets`|`ReadableTable`|Count of `get/getAsync()` operations
-|`num-getAlls`|`ReadableTable`|Count of `getAll/getAllAsync()` operations
+    package com.example.samza;
 
+    public class BadPageViewTaskFactory implements StreamTaskFactory {
+      @Override
+      public StreamTask createInstance() {
+        // Add input, output streams and tables
+        return new BadPageViewFilterTask();
+      }
+    }
+    
+{% endhighlight %}
 
-### Section 5 example
+Similarly, here is an example of 
[AsyncStreamTaskFactory](javadocs/org/apache/samza/task/AsyncStreamTaskFactory.html):
 
-It is up to the developer whether to implement both `TableReadFunction` and 
-`TableWriteFunction` in one class or two separate classes. Defining them in 
-separate classes can be cleaner if their implementations are elaborate and 
-extended, whereas keeping them in a single class may be more practical if 
-they share a considerable amount of code or are relatively short.
+{% highlight java %}
+    
+    package com.example.samza;
+
+    public class BadPageViewAsyncTaskFactory implements AsyncStreamTaskFactory 
{
+      @Override
+      public AsyncStreamTask createInstance() {
+        // Add input, output streams and tables
+        return new BadPageViewAsyncFilterTask();
+      }
+    }
+{% endhighlight %}
+
+## Task classes
+
+The actual processing logic is implemented in 
[StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or 
[AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) classes.
+
+### StreamTask
+You should implement 
[StreamTask](javadocs/org/apache/samza/task/StreamTask.html) for synchronous 
process, where the message processing is complete after the process method 
returns. An example of StreamTask is a computation that does not involve remote 
calls:
+
+{% highlight java %}
+    
+    package com.example.samza;
+
+    public class BadPageViewFilterTask implements StreamTask {
+      @Override
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        // process message synchronously
+      }
+    }
+{% endhighlight %}
+
+### AsyncStreamTask
+The [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) 
interface, on the other hand, supports asynchronous process, where the message 
processing may not be complete after the processAsync method returns. Various 
concurrent libraries like Java NIO, ParSeq and Akka can be used here to make 
asynchronous calls, and the completion is marked by invoking the 
[TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html). Samza will 
continue to process next message or shut down the container based on the 
callback status. An example of AsyncStreamTask is a computation that make 
remote calls but don’t block on the call completion:
+
+{% highlight java %}
+    
+    package com.example.samza;
+
+    public class BadPageViewAsyncFilterTask implements AsyncStreamTask {
+      @Override
+      public void processAsync(IncomingMessageEnvelope envelope,
+                               MessageCollector collector,
+                               TaskCoordinator coordinator,
+                               TaskCallback callback) {
+        // process message with asynchronous calls
+        // fire callback upon completion, e.g. invoking callback from 
asynchronous call completion thread
+      }
+    }
+{% endhighlight %}
+
+# Runtime Objects
+
+## Task Instances in Runtime
+When you run your job, Samza will create many instances of your task class 
(potentially on multiple machines). These task instances process the messages 
from the input streams.
+
+## Messages from Input Streams
+
+For each message that Samza receives from the task’s input streams, the 
[process](javadocs/org/apache/samza/task/StreamTask.html#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-)
 or 
[processAsync](javadocs/org/apache/samza/task/AsyncStreamTask.html#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-)
 method is called. The 
[envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) 
contains three things of importance: the message, the key, and the stream that 
the message came from.
+
+{% highlight java %}
+    
+    /** Every message that is delivered to a StreamTask is wrapped
+     * in an IncomingMessageEnvelope, which contains metadata about
+     * the origin of the message. */
+    public class IncomingMessageEnvelope {
+      /** A deserialized message. */
+      Object getMessage() { ... }
+
+      /** A deserialized key. */
+      Object getKey() { ... }
+
+      /** The stream and partition that this message came from. */
+      SystemStreamPartition getSystemStreamPartition() { ... }
+    }
+{% endhighlight %}
+
+The key and value are declared as Object, and need to be cast to the correct 
type. The serializer/deserializer are defined via InputDescriptor, as described 
[here](high-level-api.md#data-serialization). A deserializer can convert these 
bytes into any other type, for example the JSON deserializer mentioned above 
parses the byte array into java.util.Map, java.util.List and String objects.
+
+The 
[getSystemStreamPartition()](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html#getSystemStreamPartition--)
 method returns a 
[SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html)
 object, which tells you where the message came from. It consists of three 
parts:
+1. The *system*: the name of the system from which the message came, as 
defined as SystemDescriptor in your TaskApplication. You can have multiple 
systems for input and/or output, each with a different name.
+2. The *stream name*: the name of the stream (topic, queue) within the source 
system. This is also defined as InputDescriptor in the TaskApplication.
+3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is 
normally split into several partitions, and each partition is assigned to one 
task instance by Samza.
+
+The API looks like this:
+
+{% highlight java %}
+    
+    /** A triple of system name, stream name and partition. */
+    public class SystemStreamPartition extends SystemStream {
+
+      /** The name of the system which provides this stream. It is
+          defined in the Samza job's configuration. */
+      public String getSystem() { ... }
+
+      /** The name of the stream/topic/queue within the system. */
+      public String getStream() { ... }
+
+      /** The partition within the stream. */
+      public Partition getPartition() { ... }
+    }
+{% endhighlight %}
+
+In the example user-implemented TaskApplication above, the system name is 
“kafka”, the stream name is “pageViewEvent”. (The name “kafka” 
isn’t special — you can give your system any name you want.) If you have 
several input streams feeding into your StreamTask or AsyncStreamTask, you can 
use the SystemStreamPartition to determine what kind of message you’ve 
received.
+
+## Messages to Output Streams
+What about sending messages? If you take a look at the 
[process()](javadocs/org/apache/samza/task/StreamTask.html#process-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-)
 method in StreamTask, you’ll see that you get a 
[MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html). 
Similarly, you will get it in 
[processAsync()](javadocs/org/apache/samza/task/AsyncStreamTask.html#processAsync-org.apache.samza.system.IncomingMessageEnvelope-org.apache.samza.task.MessageCollector-org.apache.samza.task.TaskCoordinator-org.apache.samza.task.TaskCallback-)
 method in AsyncStreamTask as well.
+
+{% highlight java %}
+    
+    /** When a task wishes to send a message, it uses this interface. */
+    public interface MessageCollector {
+      void send(OutgoingMessageEnvelope envelope);
+    }
+    
+{% endhighlight %}
+
+To send a message, you create an 
[OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)
 object and pass it to the MessageCollector. At a minimum, the envelope 
specifies the message you want to send, and the system and stream name to send 
it to. Optionally you can specify the partitioning key and other parameters. 
See the 
[javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for 
details.
+
+**NOTE**: Please only use the MessageCollector object within the process() or 
processAsync() method. If you hold on to a MessageCollector instance and use it 
again later, your messages may not be sent correctly.
+
+For example, here’s a simple example to send out “Good PageViewEvents” 
in the BadPageViewFilterTask:
+
+{% highlight java %}
+    
+    public class BadPageViewFilterTask implements StreamTask {
+
+      // Send outgoing messages to a stream called "words"
+      // in the "kafka" system.
+      private final SystemStream OUTPUT_STREAM =
+        new SystemStream("kafka", "goodPageViewEvent");
+      @Override
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        if (isBadPageView(envelope)) {
+          // skip the message, increment the counter, do not send it
+          return;
+        }
+        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, 
envelope.getKey(), envelope.getValue()));
+      }
+    }
+    
+{% endhighlight %}
+
+## Accessing Tables
+There are many cases that you will need to lookup a table when processing an 
incoming message. Samza allows access to tables by a unique name through 
[TaskContext.getTable()](javadocs/org/apache/samza/task/TaskContext.html#getTable-java.lang.String-)
 method. [TaskContext](javadocs/org/apache/samza/task/TaskContext.html) is 
accessed via 
[Context.getTaskContext()](javadocs/org/apache/samza/context/Context.html#getTaskContext--)
 in the [InitiableTask’s 
init()]((javadocs/org/apache/samza/task/InitableTask.html#init-org.apache.samza.context.Context-))
 method. A user code example to access a table in the above TaskApplication 
example is here:
+
+{% highlight java %}
+
+    public class BadPageViewFilter implements StreamTask, InitableTask {
+      private final SystemStream OUTPUT_STREAM = new SystemStream(“kafka”, 
“goodPageViewEvent”);
+      private ReadWriteTable<String, Integer> badPageUrlTable;
+      @Override
+      public void init(Context context) {
+        badPageUrlTable = (ReadWriteTable<String, Integer>) 
context.getTaskContext().getTable("badPageUrlTable");
+      }
+
+      @Override
+      public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
+        String key = (String)message.getKey();
+        if (badPageUrlTable.containsKey(key)) {
+          // skip the message, increment the counter, do not send it
+          badPageUrlTable.put(key, badPageUrlTable.get(key) + 1);
+          return;
+        }
+        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, key, 
message.getValue()));
+      }
+    }
+
+{% endhighlight %}
+
+For more detailed AsyncStreamTask example, follow the tutorial in [Samza Async 
API and Multithreading User 
Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). For 
more details on APIs, please refer to [Configuration](../jobs/configuration.md) 
and [Javadocs](javadocs).
+
+# Other Task Interfaces
+
+There are other task interfaces to allow additional processing logic to be 
applied, besides the main per-message processing logic defined in StreamTask 
and AsyncStreamTask. You will need to implement those task interfaces in 
addition to StreamTask or AsyncStreamTask.
+
+## InitiableTask
+This task interface allows users to initialize objects that are accessed 
within a task instance.
+
+{% highlight java %}
+    
+    public interface InitableTask {
+      void init(Context context) throws Exception;
+    }
+{% endhighlight %}
+
+## WindowableTask
+This task interface allows users to define a processing logic that is invoked 
periodically within a task instance.
+
+{% highlight java %}
+    
+    public interface WindowableTask {
+      void window(MessageCollector collector, TaskCoordinator coordinator) 
throws Exception;
+    }
+
+{% endhighlight %}
+
+## ClosableTask
+This task interface defines the additional logic when closing a task. Usually, 
it is in pair with InitableTask to release system resources allocated for this 
task instance.
+
+{% highlight java %}
+
+    public interface ClosableTask {
+      void close() throws Exception;
+    }
+    
+{% endhighlight %}
+
+## EndOfStreamListenerTask
+This task interface defines the additional logic when a task instance has 
reached the end of all input SystemStreamPartitions (see Samza as a batch job).
+
+{% highlight java %}
+
+    public interface EndOfStreamListenerTask {
+      void onEndOfStream(MessageCollector collector, TaskCoordinator 
coordinator) throws Exception;
+    }
+    
+{% endhighlight %}
+
+# Legacy Task Application
+
+For legacy task application which do not implement TaskApplication interface, 
you may specify the system, stream, and local stores in your job’s 
configuration, in addition to task.class. An incomplete example of 
configuration for legacy task application could look like this (see the 
[configuration](../jobs/configuration.md) documentation for more detail):
+
+{% highlight jproperties %}
+
+    # This is the class above, which Samza will instantiate when the job is run
+    task.class=com.example.samza.PageViewFilterTask
+
+    # Define a system called "kafka" (you can give it any name, and you can 
define
+    # multiple systems if you want to process messages from different sources)
+    
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+
+    # The job consumes a topic called "PageViewEvent" from the "kafka" system
+    task.inputs=kafka.PageViewEvent
+
+    # Define a serializer/deserializer called "json" which parses JSON messages
+    
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+    # Use the "json" serializer for messages in the "PageViewEvent" topic
+    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
+    
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/880d2f0c/docs/learn/documentation/versioned/api/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/overview.md 
b/docs/learn/documentation/versioned/api/overview.md
deleted file mode 100644
index d589bc6..0000000
--- a/docs/learn/documentation/versioned/api/overview.md
+++ /dev/null
@@ -1,160 +0,0 @@
----
-layout: page
-title: API Overview
----
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-When writing a stream processor for Samza, you must implement either 
[StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or 
[AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) 
interface. You should implement StreamTask for synchronous process, where the 
message processing is complete after the *process* method returns. An example 
of StreamTask is a computation that does not involve remote calls:
-
-{% highlight java %}
-package com.example.samza;
-
-public class MyTaskClass implements StreamTask {
-
-  public void process(IncomingMessageEnvelope envelope,
-                      MessageCollector collector,
-                      TaskCoordinator coordinator) {
-    // process message
-  }
-}
-{% endhighlight %}
-
-The AsyncSteamTask interface, on the other hand, supports asynchronous 
process, where the message processing may not be complete after the 
*processAsync* method returns. Various concurrent libraries like Java NIO, 
ParSeq and Akka can be used here to make asynchronous calls, and the completion 
is marked by invoking the 
[TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html). Samza will 
continue to process next message or shut down the container based on the 
callback status. An example of AsyncStreamTask is a computation that make 
remote calls but don't block on the call completion:
-
-{% highlight java %}
-package com.example.samza;
-
-public class MyAsyncTaskClass implements AsyncStreamTask {
-
-  public void processAsync(IncomingMessageEnvelope envelope,
-                           MessageCollector collector,
-                           TaskCoordinator coordinator,
-                           TaskCallback callback) {
-    // process message with asynchronous calls
-    // fire callback upon completion, e.g. invoking callback from asynchronous 
call completion thread
-  }
-}
-{% endhighlight %}
-
-When you run your job, Samza will create several instances of your class 
(potentially on multiple machines). These task instances process the messages 
in the input streams.
-
-In your job's configuration you can tell Samza which streams you want to 
consume. An incomplete example could look like this (see the [configuration 
documentation](../jobs/configuration.html) for more detail):
-
-{% highlight jproperties %}
-# This is the class above, which Samza will instantiate when the job is run
-task.class=com.example.samza.MyTaskClass
-
-# Define a system called "kafka" (you can give it any name, and you can define
-# multiple systems if you want to process messages from different sources)
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-
-# The job consumes a topic called "PageViewEvent" from the "kafka" system
-task.inputs=kafka.PageViewEvent
-
-# Define a serializer/deserializer called "json" which parses JSON messages
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-
-# Use the "json" serializer for messages in the "PageViewEvent" topic
-systems.kafka.streams.PageViewEvent.samza.msg.serde=json
-{% endhighlight %}
-
-For each message that Samza receives from the task's input streams, the 
*process* method is called. The 
[envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) 
contains three things of importance: the message, the key, and the stream that 
the message came from.
-
-{% highlight java %}
-/** Every message that is delivered to a StreamTask is wrapped
- * in an IncomingMessageEnvelope, which contains metadata about
- * the origin of the message. */
-public class IncomingMessageEnvelope {
-  /** A deserialized message. */
-  Object getMessage() { ... }
-
-  /** A deserialized key. */
-  Object getKey() { ... }
-
-  /** The stream and partition that this message came from. */
-  SystemStreamPartition getSystemStreamPartition() { ... }
-}
-{% endhighlight %}
-
-The key and value are declared as Object, and need to be cast to the correct 
type. If you don't configure a 
[serializer/deserializer](../container/serialization.html), they are typically 
Java byte arrays. A deserializer can convert these bytes into any other type, 
for example the JSON deserializer mentioned above parses the byte array into 
java.util.Map, java.util.List and String objects.
-
-The `getSystemStreamPartition()` method returns a 
[SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html)
 object, which tells you where the message came from. It consists of three 
parts:
-
-1. The *system*: the name of the system from which the message came, as 
defined in your job configuration. You can have multiple systems for input 
and/or output, each with a different name.
-2. The *stream name*: the name of the stream (topic, queue) within the source 
system. This is also defined in the job configuration.
-3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is 
normally split into several partitions, and each partition is assigned to one 
StreamTask instance by Samza.
-
-The API looks like this:
-
-{% highlight java %}
-/** A triple of system name, stream name and partition. */
-public class SystemStreamPartition extends SystemStream {
-
-  /** The name of the system which provides this stream. It is
-      defined in the Samza job's configuration. */
-  public String getSystem() { ... }
-
-  /** The name of the stream/topic/queue within the system. */
-  public String getStream() { ... }
-
-  /** The partition within the stream. */
-  public Partition getPartition() { ... }
-}
-{% endhighlight %}
-
-In the example job configuration above, the system name is "kafka", the stream 
name is "PageViewEvent". (The name "kafka" isn't special &mdash; you can give 
your system any name you want.) If you have several input streams feeding into 
your StreamTask, you can use the SystemStreamPartition to determine what kind 
of message you've received.
-
-What about sending messages? If you take a look at the process() method in 
StreamTask, you'll see that you get a 
[MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html).
-
-{% highlight java %}
-/** When a task wishes to send a message, it uses this interface. */
-public interface MessageCollector {
-  void send(OutgoingMessageEnvelope envelope);
-}
-{% endhighlight %}
-
-To send a message, you create an 
[OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)
 object and pass it to the message collector. At a minimum, the envelope 
specifies the message you want to send, and the system and stream name to send 
it to. Optionally you can specify the partitioning key and other parameters. 
See the 
[javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for 
details.
-
-**NOTE:** Please only use the MessageCollector object within the `process()` 
method. If you hold on to a MessageCollector instance and use it again later, 
your messages may not be sent correctly.
-
-For example, here's a simple task that splits each input message into words, 
and emits each word as a separate message:
-
-{% highlight java %}
-public class SplitStringIntoWords implements StreamTask {
-
-  // Send outgoing messages to a stream called "words"
-  // in the "kafka" system.
-  private final SystemStream OUTPUT_STREAM =
-    new SystemStream("kafka", "words");
-
-  public void process(IncomingMessageEnvelope envelope,
-                      MessageCollector collector,
-                      TaskCoordinator coordinator) {
-    String message = (String) envelope.getMessage();
-
-    for (String word : message.split(" ")) {
-      // Use the word as the key, and 1 as the value.
-      // A second task can add the 1's to get the word count.
-      collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
-    }
-  }
-}
-{% endhighlight %}
-
-For AsyncStreamTask example, follow the tutorial in [Samza Async API and 
Multithreading User 
Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). For 
more details on APIs, please refer to 
[Configuration](../jobs/configuration-table.html) and [Javadocs](javadocs).
-## [SamzaContainer &raquo;](../container/samza-container.html)

http://git-wip-us.apache.org/repos/asf/samza/blob/880d2f0c/docs/learn/documentation/versioned/api/programming-model.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/programming-model.md 
b/docs/learn/documentation/versioned/api/programming-model.md
new file mode 100644
index 0000000..1c9bd1c
--- /dev/null
+++ b/docs/learn/documentation/versioned/api/programming-model.md
@@ -0,0 +1,158 @@
+---
+layout: page
+title: Programming Model
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+# Introduction
+Samza provides different sets of programming APIs to meet requirements from 
different sets of users. The APIs are listed below:
+
+1. Java programming APIs: Samza provides Java programming APIs for users who 
are familiar with imperative programming languages. The overall programming 
model to create a Samza application in Java will be described here. Samza also 
provides two sets of APIs to describe user processing logic:
+    1. [High-level API](high-level-api.md): this API allows users to describe 
the end-to-end stream processing pipeline in a connected DAG (Directional 
Acyclic Graph). It also provides a rich set of build-in operators to help users 
implementing common transformation logic, such as filter, map, join, and window.
+    2. [Task API](low-level-api.md): this is low-level Java API which provides 
“bare-metal” programming interfaces to the users. Task API allows users to 
explicitly access physical implementation details in the system, such as 
accessing the physical system stream partition of an incoming message and 
explicitly controlling the thread pool to execute asynchronous processing 
method.
+2. [Samza SQL](samza-sql.md): Samza provides SQL for users who are familiar 
with declarative query languages, which allows the users to focus on data 
manipulation via SQL predicates and UDFs, not the physical implementation 
details.
+3. Beam API: Samza also provides a [Beam 
runner](https://beam.apache.org/documentation/runners/capability-matrix/) to 
run applications written in Beam API. This is considered as an extension to 
existing operators supported by the high-level API in Samza.
+
+The following sections will be focused on Java programming APIs.
+
+# Key Concepts for a Samza Java Application
+To write a Samza Java application, you will typically follow the steps below:
+1. Define your input and output streams and tables
+2. Define your main processing logic
+
+The following sections will talk about key concepts in writing your Samza 
applications in Java.
+
+## Samza Applications
+When writing your stream processing application using Java API in Samza, you 
implement either a 
[StreamApplication](javadocs/org/apache/samza/application/StreamApplication.html)
 or 
[TaskApplication](javadocs/org/apache/samza/application/TaskApplication.html) 
and define your processing logic in the describe method.
+- For StreamApplication:
+
+{% highlight java %}
+    
+    public void describe(StreamApplicationDescriptor appDesc) { … }
+
+{% endhighlight %}
+- For TaskApplication:
+
+{% highlight java %}
+    
+    public void describe(TaskApplicationDescriptor appDesc) { … }
+
+{% endhighlight %}
+
+## Descriptors for Data Streams and Tables
+There are three different types of descriptors in Samza: 
[InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html),
 
[OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html),
 and 
[TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html).
 The InputDescriptor and OutputDescriptor are used to describe the physical 
sources and destinations of a stream, while a TableDescriptor is used to 
describe the physical dataset and IO functions for a table.
+Usually, you will obtain InputDescriptor and OutputDescriptor from a 
[SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor.html),
 which include all information about producer and consumers to a physical 
system. The following code snippet illustrate how you will obtain 
InputDescriptor and OutputDescriptor from a SystemDescriptor.
+
+{% highlight java %}
+    
+    public class BadPageViewFilter implements StreamApplication {
+      @Override
+      public void describe(StreamApplicationDescriptor appDesc) {
+        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
+        InputDescriptor<PageView> pageViewInput = 
kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
+        OutputDescriptor<DecoratedPageView> pageViewOutput = 
kafka.getOutputDescriptor(“decorated-page-views”, new 
JsonSerdeV2<>(DecoratedPageView.class));
+
+        // Now, implement your main processing logic
+      }
+    }
+    
+{% endhighlight %}
+
+You can also add a TableDescriptor to your application.
+
+{% highlight java %}
+     
+    public class BadPageViewFilter implements StreamApplication {
+      @Override
+      public void describe(StreamApplicationDescriptor appDesc) {
+        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
+        InputDescriptor<PageView> pageViewInput = 
kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
+        OutputDescriptor<DecoratedPageView> pageViewOutput = 
kafka.getOutputDescriptor(“decorated-page-views”, new 
JsonSerdeV2<>(DecoratedPageView.class));
+        TableDescriptor<String, Integer> viewCountTable = new 
RocksDBTableDescriptor(
+            “pageViewCountTable”, KVSerde.of(new StringSerde(), new 
IntegerSerde()));
+
+        // Now, implement your main processing logic
+      }
+    }
+    
+{% endhighlight %}
+
+The same code in the above describe method applies to TaskApplication as well.
+
+## Stream Processing Logic
+
+Samza provides two sets of APIs to define the main stream processing logic, 
high-level API and Task API, via StreamApplication and TaskApplication, 
respectively. 
+
+High-level API allows you to describe the processing logic in a connected DAG 
of transformation operators, like the example below:
+
+{% highlight java %}
+
+    public class BadPageViewFilter implements StreamApplication {
+      @Override
+      public void describe(StreamApplicationDescriptor appDesc) {
+        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
+        InputDescriptor<PageView> pageViewInput = 
kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
+        OutputDescriptor<DecoratedPageView> pageViewOutput = 
kafka.getOutputDescriptor(“decorated-page-views”, new 
JsonSerdeV2<>(DecoratedPageView.class));
+        TableDescriptor<String, Integer> viewCountTable = new 
RocksDBTableDescriptor(
+            “pageViewCountTable”, KVSerde.of(new StringSerde(), new 
IntegerSerde()));
+
+        // Now, implement your main processing logic
+        MessageStream<PageView> pageViews = 
appDesc.getInputStream(pageViewInput);
+        pageViews.filter(this::isValidPageView)
+             .map(this::addProfileInformation)
+             .sendTo(pageViewOutput);
+      }
+    }
+    
+{% endhighlight %}
+
+Task API allows you to describe the processing logic in a customized 
StreamTaskFactory or AsyncStreamTaskFactory, like the example below:
+
+{% highlight java %}
+
+    public class BadPageViewFilter implements TaskApplication {
+      @Override
+      public void describe(TaskApplicationDescriptor appDesc) {
+        KafkaSystemDescriptor kafka = new KafkaSystemDescriptor();
+        InputDescriptor<PageView> pageViewInput = 
kafka.getInputDescriptor(“page-views”, new JsonSerdeV2<>(PageView.class));
+        OutputDescriptor<DecoratedPageView> pageViewOutput = 
kafka.getOutputDescriptor(“decorated-page-views”, new 
JsonSerdeV2<>(DecoratedPageView.class));
+        TableDescriptor<String, Integer> viewCountTable = new 
RocksDBTableDescriptor(
+            “pageViewCountTable”, KVSerde.of(new StringSerde(), new 
IntegerSerde()));
+
+        // Now, implement your main processing logic
+        appDesc.withInputStream(pageViewInput)
+           .withOutputStream(pageViewOutput)
+           .withTaskFactory(new PageViewFilterTaskFactory());
+      }
+    }
+    
+{% endhighlight %}
+
+Details for [high-level API](high-level-api.md) and [Task 
API](low-level-api.md) are explained later.
+
+## Configuration for a Samza Application
+
+To deploy a Samza application, you will need to specify the implementation 
class for your application and the ApplicationRunner to launch your 
application. The following is an incomplete example of minimum required 
configuration to set up the Samza application and the runner:
+{% highlight jproperties %}
+    
+    # This is the class implementing StreamApplication
+    app.class=com.example.samza.PageViewFilter
+
+    # This is defining the ApplicationRunner class to launch the application
+    app.runner.class=org.apache.samza.runtime.RemoteApplicationRunner
+    
+{% endhighlight %}
\ No newline at end of file

Reply via email to