http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/api/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/api/overview.md 
b/docs/learn/documentation/0.7.0/api/overview.md
index 2d03ec3..489ba6d 100644
--- a/docs/learn/documentation/0.7.0/api/overview.md
+++ b/docs/learn/documentation/0.7.0/api/overview.md
@@ -3,100 +3,112 @@ layout: page
 title: API Overview
 ---
 
-When writing a stream processor for Samza, you must implement the StreamTask 
interface:
-
-```
-/** User processing tasks implement this. */
-public interface StreamTask {
-  void process(IncomingMessageEnvelope envelope, MessageCollector collector, 
TaskCoordinator coordinator) throws Exception;
-}
-```
-
-When Samza runs your task, the process method will be called once for each 
message that Samza receives from your task's input streams. The envelope 
contains three things of importance: the message, the key, and the stream that 
the message came from:
-
-```
-/** This class is given to a StreamTask once for each message that it 
receives. */
-public class IncomingMessageEnvelope {
-  /** A deserialized message. */
-  Object getMessage() { ... }
-
-  /** A deserialized key. */
-  Object getKey() { ... }
-
-  /** The stream and partition that this message came from. */
-  SystemStreamPartition getSystemStreamPartition() { ... }
-}
-```
-<!-- TODO This description and example needs to be updated to match 
SystemStreamPartition. -->
-Note that the getSystemStreamPartition() method returns a 
SystemStreamPartition object, not a String, as you might expect. This is 
because a Samza Stream actually consists of a name, a system, and a stream. The 
name is what you call the stream in your Samza configuration file. The system 
is the name of the cluster that the stream came from (e.g. 
kafka-aggreate-tracking, databus, etc). The system name is also defined in your 
Samza configuration file. Lastly, the actual stream is available. For Kafka, 
this would be the Kafka topic's name.
-
-```
-/** A name/system/stream tuple that represents a Samza stream. */
-public class SystemStreamPartition extends SystemStream {
-
-  /** The system name that this stream is associated with. This is also
-      defined in a Samza job's configuration. */
-  public String getSystem() { ... }
-
-  /** The stream name for the system. */
-  public String getStream() { ... }
-
-  /** The partition within the stream. */
-    public Partition getPartition() { ... }
-}
-```
-
-To make this a bit clearer, let me show you an example. A Samza job's 
configuration might have:
-
-```
-# the stream
-streams.page-view-event.stream=PageViewEvent
-streams.page-view-event.system=kafka
-streams.page-view-event.serde=json
-
-# the system
-systems.kafka.samza.partition.manager=samza.stream.kafka.KafkaPartitionManager
-systems.kafka.samza.consumer.factory=samza.stream.kafka.KafkaConsumerFactory
-systems.kafka.samza.producer.factory=samza.stream.kafka.KafkaProducerFactory
-...
-```
-
-In this example, getName would return page-view-event, getSystem would return 
kafka, and getStream would return PageViewEvent. If you've got more than one 
input stream feeding into your StreamTask, you can use the getStream() object 
to determine what kind of message you've received.
-
-What about sending messages? If you take a look at the process() method in 
StreamTask, you'll see that you get a MessageCollector.
-
-```
-/** When a task wishes to send a message, it uses this class. */
-public interface MessageCollector {
-  void send(OutgoingMessageEnvelope envelope);
-}
-```
-
-The collector takes OutgoingMessageEnvelope, which allows tasks to supply a 
partition key when sending the message. The partition key, if supplied, is used 
to determine which partition of a stream a message is destined for.
-
-Please only use the MessageCollector object within the process() method. If 
you hold onto a MessageCollector instance and use it again later, your messages 
may not be sent correctly.
-
-```
-/** A message envelope that has a key. */
-public class OutgoingMessageEnvelope {
-  ...
-  Object getKey() { ... }
-}
-```
-
-And, putting it all together:
-
-<!-- TODO Verify that this example actually works. -->
-
-```
-class MyStreamerTask extends StreamTask {
-  def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, 
coordinator: TaskCoordinator) {
-    val msg = envelope.getMessage.asInstanceOf[GenericRecord]
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"SomeTopicPartitionedByUserId"), msg.get("user_id"), msg))
-  }
-}
-```
-
-This is a simplistic example that just reads from a stream, and sends the 
messages to SomeTopicPartitionedByUserId, partitioned by the message's user ID.
-
-## [TaskRunner &raquo;](../container/task-runner.html)
+When writing a stream processor for Samza, you must implement the 
[StreamTask](javadocs/org/apache/samza/task/StreamTask.html) interface:
+
+    package com.example.samza;
+
+    public class MyTaskClass implements StreamTask {
+
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        // process message
+      }
+    }
+
+When you run your job, Samza will create several instances of your class 
(potentially on multiple machines). These task instances process the messages 
in the input streams.
+
+In your job's configuration you can tell Samza which streams you want to 
consume. An incomplete example could look like this (see the [configuration 
documentation](../jobs/configuration.html) for more detail):
+
+    # This is the class above, which Samza will instantiate when the job is run
+    task.class=com.example.samza.MyTaskClass
+
+    # Define a system called "kafka" (you can give it any name, and you can 
define
+    # multiple systems if you want to process messages from different sources)
+    
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+
+    # The job consumes a topic called "PageViewEvent" from the "kafka" system
+    task.inputs=kafka.PageViewEvent
+
+    # Define a serializer/deserializer called "json" which parses JSON messages
+    
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+    # Use the "json" serializer for messages in the "PageViewEvent" topic
+    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
+
+For each message that Samza receives from the task's input streams, the 
*process* method is called. The 
[envelope](javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) 
contains three things of importance: the message, the key, and the stream that 
the message came from.
+
+    /** Every message that is delivered to a StreamTask is wrapped
+     * in an IncomingMessageEnvelope, which contains metadata about
+     * the origin of the message. */
+    public class IncomingMessageEnvelope {
+      /** A deserialized message. */
+      Object getMessage() { ... }
+
+      /** A deserialized key. */
+      Object getKey() { ... }
+
+      /** The stream and partition that this message came from. */
+      SystemStreamPartition getSystemStreamPartition() { ... }
+    }
+
+The key and value are declared as Object, and need to be cast to the correct 
type. If you don't configure a 
[serializer/deserializer](../container/serialization.html), they are typically 
Java byte arrays. A deserializer can convert these bytes into any other type, 
for example the JSON deserializer mentioned above parses the byte array into 
java.util.Map, java.util.List and String objects.
+
+The getSystemStreamPartition() method returns a 
[SystemStreamPartition](javadocs/org/apache/samza/system/SystemStreamPartition.html)
 object, which tells you where the message came from. It consists of three 
parts:
+
+1. The *system*: the name of the system from which the message came, as 
defined in your job configuration. You can have multiple systems for input 
and/or output, each with a different name.
+2. The *stream name*: the name of the stream (topic, queue) within the source 
system. This is also defined in the job configuration.
+3. The [*partition*](javadocs/org/apache/samza/Partition.html): a stream is 
normally split into several partitions, and each partition is assigned to one 
StreamTask instance by Samza.
+
+The API looks like this:
+
+    /** A triple of system name, stream name and partition. */
+    public class SystemStreamPartition extends SystemStream {
+
+      /** The name of the system which provides this stream. It is
+          defined in the Samza job's configuration. */
+      public String getSystem() { ... }
+
+      /** The name of the stream/topic/queue within the system. */
+      public String getStream() { ... }
+
+      /** The partition within the stream. */
+      public Partition getPartition() { ... }
+    }
+
+In the example job configuration above, the system name is "kafka", the stream 
name is "PageViewEvent". (The name "kafka" isn't special &mdash; you can give 
your system any name you want.) If you have several input streams feeding into 
your StreamTask, you can use the SystemStreamPartition to determine what kind 
of message you've received.
+
+What about sending messages? If you take a look at the process() method in 
StreamTask, you'll see that you get a 
[MessageCollector](javadocs/org/apache/samza/task/MessageCollector.html).
+
+    /** When a task wishes to send a message, it uses this interface. */
+    public interface MessageCollector {
+      void send(OutgoingMessageEnvelope envelope);
+    }
+
+To send a message, you create an 
[OutgoingMessageEnvelope](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)
 object and pass it to the message collector. At a minimum, the envelope 
specifies the message you want to send, and the system and stream name to send 
it to. Optionally you can specify the partitioning key and other parameters. 
See the 
[javadoc](javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) for 
details.
+
+**NOTE:** Please only use the MessageCollector object within the process() 
method. If you hold on to a MessageCollector instance and use it again later, 
your messages may not be sent correctly.
+
+For example, here's a simple task that splits each input message into words, 
and emits each word as a separate message:
+
+    public class SplitStringIntoWords implements StreamTask {
+
+      // Send outgoing messages to a stream called "words"
+      // in the "kafka" system.
+      private final SystemStream OUTPUT_STREAM =
+        new SystemStream("kafka", "words");
+
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        String message = (String) envelope.getMessage();
+
+        for (String word : message.split(" ")) {
+          // Use the word as the key, and 1 as the value.
+          // A second task can add the 1's to get the word count.
+          collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
+        }
+      }
+    }
+
+## [SamzaContainer &raquo;](../container/samza-container.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/comparisons/introduction.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/comparisons/introduction.md 
b/docs/learn/documentation/0.7.0/comparisons/introduction.md
index 77c04ca..6ecffcf 100644
--- a/docs/learn/documentation/0.7.0/comparisons/introduction.md
+++ b/docs/learn/documentation/0.7.0/comparisons/introduction.md
@@ -43,7 +43,7 @@ example above, where you have a stream of page-view events 
including the ID of t
 
 Now you can write a Samza job that takes both the page-view event and the 
changelog as inputs. You make sure that they are partitioned on the same key 
(e.g. user ID). Every time a changelog event comes in, you write the updated 
user information to the task's local storage. Every time a page-view event 
comes in, you read the current information about that user from local storage. 
That way, you can keep all the state local to a task, and never need to query a 
remote database.
 
-![Stateful 
Processing](/img/0.7.0/learn/documentation/introduction/samza_state.png)
+<img src="/img/0.7.0/learn/documentation/introduction/samza_state.png" 
alt="Stateful Processing" class="diagram-large">
 
 In effect, you now have a replica of the main database, broken into small 
partitions that are on the same machines as the Samza tasks. Database writes 
still need to go to the main database, but when you need to read from the 
database in order to process a message from the input stream, you can just 
consult the task's local state.
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/comparisons/mupd8.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/comparisons/mupd8.md 
b/docs/learn/documentation/0.7.0/comparisons/mupd8.md
index 78e7b64..2cc8ee6 100644
--- a/docs/learn/documentation/0.7.0/comparisons/mupd8.md
+++ b/docs/learn/documentation/0.7.0/comparisons/mupd8.md
@@ -57,7 +57,7 @@ This was motivated by our experience with Hadoop, where the 
data flow between jo
 
 MUPD8 executes all of its map/update processors inside a single JVM, using 
threads. This is memory-efficient, as the JVM memory overhead is shared across 
the threads.
 
-Samza uses a separate JVM for each stream processor container 
([TaskRunner](../container/task-runner.html)). This has the disadvantage of 
using more memory compared to running multiple stream processing threads within 
a single JVM. However, the advantage is improved isolation between tasks, which 
can make them more reliable.
+Samza uses a separate JVM for each [stream processor 
container](../container/samza-container.html). This has the disadvantage of 
using more memory compared to running multiple stream processing threads within 
a single JVM. However, the advantage is improved isolation between tasks, which 
can make them more reliable.
 
 ### Isolation
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/checkpointing.md 
b/docs/learn/documentation/0.7.0/container/checkpointing.md
index 42b2e8d..6a93d84 100644
--- a/docs/learn/documentation/0.7.0/container/checkpointing.md
+++ b/docs/learn/documentation/0.7.0/container/checkpointing.md
@@ -3,47 +3,79 @@ layout: page
 title: Checkpointing
 ---
 
-On the [Streams](streams.html) page, on important detail was glossed over. 
When a TaskRunner instantiates a SystemConsumer for an input stream/partition 
pair, how does the TaskRunner know where in the stream to start reading 
messages. If you recall, Kafka has the concept of an offset, which defines a 
specific location in a topic/partition pair. The idea is that an offset can be 
used to reference a specific point in a stream/partition pair. When you read 
messages from Kafka, you can supply an offset to specify at which point you'd 
like to read from. After you read, you increment your offset, and get the next 
message.
+Samza provides fault-tolerant processing of streams: Samza guarantees that 
messages won't be lost, even if your job crashes, if a machine dies, if there 
is a network fault, or something else goes wrong. In order to provide this 
guarantee, Samza expects the [input system](streams.html) to meet the following 
requirements:
 
-![diagram](/img/0.7.0/learn/documentation/container/checkpointing.png)
+* The stream may be sharded into one or more *partitions*. Each partition is 
independent from the others, and is replicated across multiple machines (the 
stream continues to be available, even if a machine fails).
+* Each partition consists of a sequence of messages in a fixed order. Each 
message has an *offset*, which indicates its position in that sequence. 
Messages are always consumed sequentially within each partition.
+* A Samza job can start consuming the sequence of messages from any starting 
offset.
 
-This diagram looks the same as on the [Streams](streams.html) page, except 
that there are black lines at different points in each input stream/partition 
pair. These lines represent the current offset for each stream consumer. As the 
stream consumer reads, the offset increases, and moves closer to the "head" of 
the stream. The diagram also illustrates that the offsets might be staggered, 
such that some offsets are farther along in their stream/partition than others.
+Kafka meets these requirements, but they can also be implemented with other 
message broker systems.
 
-If a SystemConsumer is reading messages for a TaskRunner, and the TaskRunner 
stops for some reason (due to hardware failure, re-deployment, or whatever), 
the SystemConsumer should start where it left off when the TaskRunner starts 
back up again. We're able to do this because the Kafka broker is buffering 
messages on a remote server (the broker). Since the messages are available when 
we come back, we can just start from our last offset, and continue moving 
forward, without losing data.
+As described in the [section on SamzaContainer](samza-container.html), each 
task instance of your job consumes one partition of an input stream. Each task 
has a *current offset* for each input stream: the offset of the next message to 
be read from that stream partition. Every time a message is read from the 
stream, the current offset moves forwards.
 
-The TaskRunner supports this ability using something called a 
CheckpointManager.
+If a Samza container fails, it needs to be restarted (potentially on another 
machine) and resume processing where the failed container left off. In order to 
enable this, a container periodically checkpoints the current offset for each 
task instance.
 
-```
-public interface CheckpointManager {
-  void start();
+<img src="/img/0.7.0/learn/documentation/container/checkpointing.svg" 
alt="Illustration of checkpointing" class="diagram-large">
 
-  void register(Partition partition);
+When a Samza container starts up, it looks for the most recent checkpoint and 
starts consuming messages from the checkpointed offsets. If the previous 
container failed unexpectedly, the most recent checkpoint may be slightly 
behind the current offsets (i.e. the job may have consumed some more messages 
since the last checkpoint was written), but we can't know for sure. In that 
case, the job may process a few messages again.
 
-  void writeCheckpoint(Partition partition, Checkpoint checkpoint);
+This guarantee is called *at-least-once processing*: Samza ensures that your 
job doesn't miss any messages, even if containers need to be restarted. 
However, it is possible for your job to see the same message more than once 
when a container is restarted. We are planning to address this in a future 
version of Samza, but for now it is just something to be aware of: for example, 
if you are counting page views, a forcefully killed container could cause 
events to be slightly over-counted. You can reduce duplication by checkpointing 
more frequently, at a slight performance cost.
 
-  Checkpoint readLastCheckpoint(Partition partition);
+For checkpoints to be effective, they need to be written somewhere where they 
will survive faults. Samza allows you to write checkpoints to the file system 
(using FileSystemCheckpointManager), but that doesn't help if the machine fails 
and the container needs to be restarted on another machine. The most common 
configuration is to use Kafka for checkpointing. You can enable this with the 
following job configuration:
 
-  void stop();
-}
+    # The name of your job determines the name under which checkpoints will be 
stored
+    job.name=example-job
 
-public class Checkpoint {
-  private final Map<SystemStream, String> offsets;
-  ...
-}
-```
+    # Define a system called "kafka" for consuming and producing to a Kafka 
cluster
+    
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 
-As you can see, the checkpoint manager provides a way to write out checkpoints 
for a given partition. Right now, the checkpoint contains a map. The map's keys 
are input stream names, and the map's values are each input stream's offset. 
Each checkpoint is managed per-partition. For example, if you have 
page-view-event and service-metric-event defined as streams in your Samza job's 
configuration file, the TaskRunner would supply a checkpoint with two keys in 
each checkpoint offset map (one for page-view-event and the other for 
service-metric-event).
+    # Declare that we want our job's checkpoints to be written to Kafka
+    
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+    task.checkpoint.system=kafka
 
-Samza provides two checkpoint managers: FileSystemCheckpointManager and 
KafkaCheckpointManager. The KafkaCheckpointManager is what you generally want 
to use. The way that KafkaCheckpointManager works is as follows: it writes 
checkpoint messages for your Samza job to a special Kafka topic. This topic's 
name is \_\_samza\_checkpoint\_your-job-name. For example, if you had a Samza 
job called "my-first-job", the Kafka topic would be called 
\_\_samza\_checkpoint\_my-first-job. This Kafka topic is partitioned 
identically to your Samza job's partition count. If your Samza job has 10 
partitions, the checkpoint topic for your Samza job will also have 10 
partitions. Every time that the TaskRunner calls writeCheckpoint, a checkpoint 
message will be sent to the partition that corresponds with the partition for 
the checkpoint that the TaskRunner wishes to write.
+    # By default, a checkpoint is written every 60 seconds. You can change 
this if you like.
+    task.commit.ms=60000
 
-![diagram](/img/0.7.0/learn/documentation/container/checkpointing-2.png)
+In this configuration, Samza writes checkpoints to a separate Kafka topic 
called \_\_samza\_checkpoint\_&lt;job-name&gt;\_&lt;job-id&gt; (in the example 
configuration above, the topic would be called 
\_\_samza\_checkpoint\_example-job\_1). Once per minute, Samza automatically 
sends a message to this topic, in which the current offsets of the input 
streams are encoded. When a Samza container starts up, it looks for the most 
recent offset message in this topic, and loads that checkpoint.
 
-When the TaskRunner starts for the first time, the offset behavior of the 
SystemConsumers is undefined. If the system for the SystemConsumer is Kafka, we 
fall back to the auto.offset.reset setting. If the auto.offset.reset is set to 
"largest", we start reading messages from the head of the stream; if it's set 
to "smallest", we read from the tail. If it's undefined, the TaskRunner will 
fail.
+Sometimes it can be useful to use checkpoints only for some input streams, but 
not for others. In this case, you can tell Samza to ignore any checkpointed 
offsets for a particular stream name:
 
-The TaskRunner calls writeCheckpoint at a windowed interval (e.g. every 10 
seconds). If the TaskRunner fails, and restarts, it simply calls 
readLastCheckpoint for each partition. In the case of the 
KafkaCheckpointManager, this readLastCheckpoint method will read the last 
message that was written to the checkpoint topic for each partition in the job. 
One edge case to consider is that SystemConsumers might have read messages from 
an offset that hasn't yet been checkpointed. In such a case, when the 
TaskRunner reads the last checkpoint for each partition, the offsets might be 
farther back in the stream. When this happens, your StreamTask could get 
duplicate messages (i.e. it saw message X, failed, restarted at an offset prior 
to message X, and then reads message X again). Thus, Samza currently provides 
at least once messaging. You might get duplicates. Caveat emptor.
+    # Ignore any checkpoints for the topic "my-special-topic"
+    systems.kafka.streams.my-special-topic.samza.reset.offset=true
 
-<!-- TODO Add a link to the fault tolerance SEP when one exists -->
+    # Always start consuming "my-special-topic" at the oldest available offset
+    systems.kafka.streams.my-special-topic.samza.offset.default=oldest
 
-*Note that there are design proposals in the works to give exactly once 
messaging.*
+The following table explains the meaning of these configuration parameters:
+
+<table class="documentation">
+  <tr>
+    <th>Parameter name</th>
+    <th>Value</th>
+    <th>Meaning</th>
+  </tr>
+  <tr>
+    <td rowspan="2" 
class="nowrap">systems.&lt;system&gt;.<br>streams.&lt;stream&gt;.<br>samza.reset.offset</td>
+    <td>false (default)</td>
+    <td>When container starts up, resume processing from last checkpoint</td>
+  </tr>
+  <tr>
+    <td>true</td>
+    <td>Ignore checkpoint (pretend that no checkpoint is present)</td>
+  </tr>
+  <tr>
+    <td rowspan="2" 
class="nowrap">systems.&lt;system&gt;.<br>streams.&lt;stream&gt;.<br>samza.offset.default</td>
+    <td>upcoming (default)</td>
+    <td>When container starts and there is no checkpoint (or the checkpoint is 
ignored), only process messages that are published after the job is started, 
but no old messages</td>
+  </tr>
+  <tr>
+    <td>oldest</td>
+    <td>When container starts and there is no checkpoint (or the checkpoint is 
ignored), jump back to the oldest available message in the system, and consume 
all messages from that point onwards (most likely this means repeated 
processing of messages already seen previously)</td>
+  </tr>
+</table>
+
+Note that the example configuration above causes your tasks to start consuming 
from the oldest offset *every time a container starts up*. This is useful in 
case you have some in-memory state in your tasks that you need to rebuild from 
source data in an input stream. If you are using streams in this way, you may 
also find [bootstrap streams](streams.html) useful.
+
+If you want to make a one-off change to a job's consumer offsets, for example 
to force old messages to be processed again with a new version of your code, 
you can use CheckpointTool to manipulate the job's checkpoint. The tool is 
included in Samza's [source repository](/contribute/code.html) and documented 
in the README.
 
 ## [State Management &raquo;](state-management.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/event-loop.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/event-loop.md 
b/docs/learn/documentation/0.7.0/container/event-loop.md
index 1f9c51e..903ef90 100644
--- a/docs/learn/documentation/0.7.0/container/event-loop.md
+++ b/docs/learn/documentation/0.7.0/container/event-loop.md
@@ -3,87 +3,40 @@ layout: page
 title: Event Loop
 ---
 
-The event loop is the [TaskRunner](task-runner.html)'s single thread that is 
in charge of [reading](streams.html), [writing](streams.html), [metrics 
flushing](metrics.html), [checkpointing](checkpointing.html), and 
[windowing](windowing.html). It's the code that puts all of this stuff 
together. Each SystemConsumer reads messages on its own thread, but writes 
messages into a centralized message queue. The TaskRunner uses this queue to 
funnel all of the messages into the event loop. Here's how the event loop works:
+The event loop is the [container](samza-container.html)'s single thread that 
is in charge of [reading and writing messages](streams.html), [flushing 
metrics](metrics.html), [checkpointing](checkpointing.html), and 
[windowing](windowing.html).
 
-1. Take a message from the incoming message queue (the queue that the 
SystemConsumers are putting their messages)
-2. Give the message to the appropriate StreamTask by calling process() on it
-3. Call window() on the StreamTask if it implements WindowableTask, and the 
window time has expired
-4. Send any StreamTask output from the process() and window() call to the 
appropriate SystemProducers
-5. Write checkpoints for any partitions that are past the defined checkpoint 
commit interval
+Samza uses a single thread because every container is designed to use a single 
CPU core; to get more parallelism, simply run more containers. This uses a bit 
more memory than multithreaded parallelism, because each JVM has some overhead, 
but it simplifies resource management and improves isolation between jobs. This 
helps Samza jobs run reliably on a multitenant cluster, where many different 
jobs written by different people are running at the same time.
 
-The TaskRunner does this, in a loop, until it is shutdown.
+You are strongly discouraged from using threads in your job's code. Samza uses 
multiple threads internally for communicating with input and output streams, 
but all message processing and user code runs on a single-threaded event loop. 
In general, Samza is not thread-safe.
 
-### Lifecycle Listeners
-
-Sometimes, it's useful to receive notifications when a specific event happens 
in the TaskRunner. For example, you might want to reset some context in the 
container whenever a new message arrives. To accomplish this, Samza provides a 
TaskLifecycleListener interface, that can be wired into the TaskRunner through 
configuration.
-
-```
-/**
- * Used to get before/after notifications before initializing/closing all tasks
- * in a given container (JVM/process).
- */
-public interface TaskLifecycleListener {
-  /**
-   * Called before all tasks in TaskRunner are initialized.
-   */
-  void beforeInit(Config config, TaskContext context);
-
-  /**
-   * Called after all tasks in TaskRunner are initialized.
-   */
-  void afterInit(Config config, TaskContext context);
-
-  /**
-   * Called before a message is processed by a task.
-   */
-  void beforeProcess(IncomingMessageEnvelope envelope, Config config, 
TaskContext context);
+### Event Loop Internals
 
-  /**
-   * Called after a message is processed by a task.
-   */
-  void afterProcess(IncomingMessageEnvelope envelope, Config config, 
TaskContext context);
+A container may have multiple 
[SystemConsumers](../api/javadocs/org/apache/samza/system/SystemConsumer.html) 
for consuming messages from different input systems. Each SystemConsumer reads 
messages on its own thread, but writes messages into a shared in-process 
message queue. The container uses this queue to funnel all of the messages into 
the event loop.
 
-  /**
-   * Called before all tasks in TaskRunner are closed.
-   */
-  void beforeClose(Config config, TaskContext context);
+The event loop works as follows:
 
-  /**
-   * Called after all tasks in TaskRunner are closed.
-   */
-  void afterClose(Config config, TaskContext context);
-}
-```
+1. Take a message from the incoming message queue;
+2. Give the message to the appropriate [task instance](samza-container.html) 
by calling process() on it;
+3. Call window() on the task instance if it implements 
[WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html), 
and the window time has expired;
+4. Send any output from the process() and window() calls to the appropriate 
[SystemProducers](../api/javadocs/org/apache/samza/system/SystemProducer.html);
+5. Write checkpoints for any tasks whose [commit interval](checkpointing.html) 
has elapsed.
 
-To use a TaskLifecycleListener, you must also create a factory for the 
listener.
+The container does this, in a loop, until it is shut down. Note that although 
there can be multiple task instances within a container (depending on the 
number of input stream partitions), their process() and window() methods are 
all called on the same thread, never concurrently on different threads.
 
-```
-public interface TaskLifecycleListenerFactory {
-  TaskLifecycleListener getLifecyleListener(String name, Config config);
-}
-```
-
-#### Configuring Lifecycle Listeners
-
-Once you have written a TaskLifecycleListener, and its factory, you can use 
the listener by configuring your Samza job with the following keys:
-
-* task.lifecycle.listeners: A CSV list of all defined listeners that should be 
used for the Samza job.
-* task.lifecycle.listener.&lt;listener name&gt;.class: A Java package and 
class name for a single listener factory.
+### Lifecycle Listeners
 
-For example, you might define a listener called "my-listener":
+Sometimes, you need to run your own code at specific points in a task's 
lifecycle. For example, you might want to set up some context in the container 
whenever a new message arrives, or perform some operations on startup or 
shutdown.
 
-```
-task.lifecycle.listener.my-listener.class=com.foo.bar.MyListenerFactory
-```
+To receive notifications when such events happen, you can implement the 
[TaskLifecycleListenerFactory](../api/javadocs/org/apache/samza/task/TaskLifecycleListenerFactory.html)
 interface. It returns a 
[TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html),
 whose methods are called by Samza at the appropriate times.
 
-And then enable it for your Samza job:
+You can then tell Samza to use your lifecycle listener with the following 
properties in your job configuration:
 
-```
-task.lifecycle.listeners=my-listener
-```
+    # Define a listener called "my-listener" by giving the factory class name
+    task.lifecycle.listener.my-listener.class=com.example.foo.MyListenerFactory
 
-Samza's container will create one instance of TaskLifecycleListener, and 
notify it whenever any of the events (shown in the API above) occur.
+    # Enable it in this job (multiple listeners can be separated by commas)
+    task.lifecycle.listeners=my-listener
 
-Borrowing from the example above, if we have a single Samza container 
processing partitions 0 and 2, and have defined a lifecycle listener called 
my-listener, then the Samza container will have a single instance of 
MyListener. MyListener's beforeInit, afterInit, beforeClose, and afterClose 
methods will all be called twice: one for each of the two partitions (e.g. 
beforeInit partition 0, before init partition 1, etc). The beforeProcess and 
afterProcess methods will simply be called once for each incoming message. The 
TaskContext is how the TaskLifecycleListener is able to tell which partition 
the event is for.
+The Samza container creates one instance of your 
[TaskLifecycleListener](../api/javadocs/org/apache/samza/task/TaskLifecycleListener.html).
 If the container has multiple task instances (processing different input 
stream partitions), the beforeInit, afterInit, beforeClose and afterClose 
methods are called for each task instance. The 
[TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html) argument 
of those methods gives you more information about the partitions.
 
 ## [JMX &raquo;](jmx.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/index.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/index.md 
b/docs/learn/documentation/0.7.0/container/index.md
deleted file mode 100644
index 17751de..0000000
--- a/docs/learn/documentation/0.7.0/container/index.md
+++ /dev/null
@@ -1,18 +0,0 @@
----
-layout: page
-title: Container
----
-
-The API section shows how a Samza StreamTask is written. To execute a 
StreamTask, Samza has a container that wraps around your StreamTask. The Samza 
container manages:
-
-* Metrics
-* Configuration
-* Lifecycle
-* Checkpointing
-* State management
-* Serialization
-* Data transport
-
-This container is called a TaskRunner. Read on to learn more about Samza's 
TaskRunner.
-
-## [JobRunner &raquo;](job-runner.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/jmx.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/jmx.md 
b/docs/learn/documentation/0.7.0/container/jmx.md
index a9fcc77..9fce867 100644
--- a/docs/learn/documentation/0.7.0/container/jmx.md
+++ b/docs/learn/documentation/0.7.0/container/jmx.md
@@ -3,11 +3,20 @@ layout: page
 title: JMX
 ---
 
-The Samza TaskRunner (and YARN Application Master) will turn on JMX using a 
randomly selected port, since Samza is meant to be run in a distributed 
environment, and it's unknown which ports will be available prior to runtime. 
The port will be output in the TaskRunner's logs with a line like this:
+Samza's containers and YARN ApplicationMaster enable 
[JMX](http://docs.oracle.com/javase/tutorial/jmx/) by default. JMX can be used 
for managing the JVM; for example, you can connect to it using 
[jconsole](http://docs.oracle.com/javase/7/docs/technotes/guides/management/jconsole.html),
 which is included in the JDK.
 
-    2013-07-05 20:42:36 JmxServer [INFO] According to 
InetAddress.getLocalHost.getHostName we are Chriss-MacBook-Pro.local
-    2013-07-05 20:42:36 JmxServer [INFO] Started JmxServer port=64905 
url=service:jmx:rmi:///jndi/rmi://Chriss-MacBook-Pro.local:64905/jmxrmi
+You can tell Samza to publish its internal [metrics](metrics.html), and any 
custom metrics you define, as JMX MBeans. To enable this, set the following 
properties in your job configuration:
 
-Any metrics that are registered in the TaskRunner will be visible through JMX. 
To toggle JMX, see the [Configuration](../jobs/configuration.html) section.
+    # Define a Samza metrics reporter called "jmx", which publishes to JMX
+    
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
+
+    # Use it (if you have multiple reporters defined, separate them with 
commas)
+    metrics.reporters=jmx
+
+JMX needs to be configured to use a specific port, but in a distributed 
environment, there is no way of knowing in advance which ports are available on 
the machines running your containers. Therefore Samza chooses the JMX port 
randomly. If you need to connect to it, you can find the port by looking in the 
container's logs, which report the JMX server details as follows:
+
+    2014-06-02 21:50:17 JmxServer [INFO] According to 
InetAddress.getLocalHost.getHostName we are samza-grid-1234.example.com
+    2014-06-02 21:50:17 JmxServer [INFO] Started JmxServer registry port=50214 
server port=50215 
url=service:jmx:rmi://localhost:50215/jndi/rmi://localhost:50214/jmxrmi
+    2014-06-02 21:50:17 JmxServer [INFO] If you are tunneling, you might want 
to try JmxServer registry port=50214 server port=50215 
url=service:jmx:rmi://samza-grid-1234.example.com:50215/jndi/rmi://samza-grid-1234.example.com:50214/jmxrmi
 
 ## [JobRunner &raquo;](../jobs/job-runner.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/metrics.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/metrics.md 
b/docs/learn/documentation/0.7.0/container/metrics.md
index 078ce47..98acd81 100644
--- a/docs/learn/documentation/0.7.0/container/metrics.md
+++ b/docs/learn/documentation/0.7.0/container/metrics.md
@@ -3,52 +3,78 @@ layout: page
 title: Metrics
 ---
 
-Samza also provides a metrics library that the TaskRunner uses. It allows a 
StreamTask to create counters and gauges. The TaskRunner then writes those 
metrics to metrics infrastructure through a MetricsReporter implementation.
-
-```
-public class MyJavaStreamerTask implements StreamTask, InitableTask {
-  private static final Counter messageCount;
-
-  public void init(Config config, TaskContext context) {
-    this.messageCount = 
context.getMetricsRegistry().newCounter(MyJavaStreamerTask.class.toString(), 
"MessageCount");
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
-    System.out.println(envelope.getMessage().toString());
-    messageCount.inc();
-  }
-}
-```
-
-Samza's metrics design is very similar to Coda Hale's 
[metrics](https://github.com/codahale/metrics) library. It has two important 
interfaces:
-
-```
-public interface MetricsRegistry {
-  Counter newCounter(String group, String name);
-
-  <T> Gauge<T> newGauge(String group, String name, T value);
-}
-
-public interface MetricsReporter {
-  void start();
-
-  void register(String source, ReadableMetricsRegistry registry);
-
-  void stop();
-}
-```
-
-### MetricsRegistry
-
-When the TaskRunner starts up, as with StreamTask instantiation, it creates a 
MetricsRegistry for every partition in the Samza job.
-
-![diagram](/img/0.7.0/learn/documentation/container/metrics.png)
-
-The TaskRunner, itself, also gets a MetricsRegistry that it can use to create 
counters and gauges. It uses this registry to measure a lot of relevant metrics 
for itself.
-
-### MetricsReporter
-
-The other important interface is the MetricsReporter. The TaskRunner uses 
MetricsReporter implementations to send its MetricsRegistry counters and gauges 
to whatever metrics infrastructure the reporter uses. A Samza job's 
configuration determines which MetricsReporters the TaskRunner will use. Out of 
the box, Samza comes with a MetricsSnapshotReporter that sends JSON metrics 
messages to a Kafka topic, and a JmxReporter that records metrics to be read 
via JMX.
+When you're running a stream process in production, it's important that you 
have good metrics to track the health of your job. In order to make this easy, 
Samza includes a metrics library. It is used by Samza itself to generate some 
standard metrics such as message throughput, but you can also use it in your 
task code to emit custom metrics.
+
+Metrics can be reported in various ways. You can expose them via 
[JMX](jmx.html), which is useful in development. In production, a common setup 
is for each Samza container to periodically publish its metrics to a "metrics" 
Kafka topic, in which the metrics from all Samza jobs are aggregated. You can 
then consume this stream in another Samza job, and send the metrics to your 
favorite graphing system such as [Graphite](http://graphite.wikidot.com/).
+
+To set up your job to publish metrics to Kafka, you can use the following 
configuration:
+
+    # Define a metrics reporter called "snapshot", which publishes metrics
+    # every 60 seconds.
+    metrics.reporters=snapshot
+    
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
+
+    # Tell the snapshot reporter to publish to a topic called "metrics"
+    # in the "kafka" system.
+    metrics.reporter.snapshot.stream=kafka.metrics
+
+    # Encode metrics data as JSON.
+    
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
+    systems.kafka.streams.metrics.samza.msg.serde=metrics
+
+With this configuration, the job automatically sends several JSON-encoded 
messages to the "metrics" topic in Kafka every 60 seconds. The messages look 
something like this:
+
+    {
+      "header": {
+        "container-name": "samza-container-0",
+        "host": "samza-grid-1234.example.com",
+        "job-id": "1",
+        "job-name": "my-samza-job",
+        "reset-time": 1401729000347,
+        "samza-version": "0.0.1",
+        "source": "Partition-2",
+        "time": 1401729420566,
+        "version": "0.0.1"
+      },
+      "metrics": {
+        "org.apache.samza.container.TaskInstanceMetrics": {
+          "commit-calls": 7,
+          "commit-skipped": 77948,
+          "kafka-input-topic-offset": "1606",
+          "messages-sent": 985,
+          "process-calls": 1093,
+          "send-calls": 985,
+          "send-skipped": 76970,
+          "window-calls": 0,
+          "window-skipped": 77955
+        }
+      }
+    }
+
+There is a separate message for each task instance, and the header tells you 
the job name, job ID and partition of the task. The metrics allow you to see 
how many messages have been processed and sent, the current offset in the input 
stream partition, and other details. There are additional messages which give 
you metrics about the JVM (heap size, garbage collection information, threads 
etc.), internal metrics of the Kafka producers and consumers, and more.
+
+It's easy to generate custom metrics in your job, if there's some value you 
want to keep an eye on. You can use Samza's built-in metrics framework, which 
is similar in design to Coda Hale's [metrics](http://metrics.codahale.com/) 
library. 
+
+You can register your custom metrics through a 
[MetricsRegistry](../api/javadocs/org/apache/samza/metrics/MetricsRegistry.html).
 Your stream task needs to implement 
[InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html), so 
that you can get the metrics registry from the 
[TaskContext](../api/javadocs/org/apache/samza/task/TaskContext.html). This 
simple example shows how to count the number of messages processed by your task:
+
+    public class MyJavaStreamTask implements StreamTask, InitableTask {
+      private Counter messageCount;
+
+      public void init(Config config, TaskContext context) {
+        this.messageCount = context
+          .getMetricsRegistry()
+          .newCounter(getClass().getName(), "message-count");
+      }
+
+      public void process(IncomingMessageEnvelope envelope,
+                          MessageCollector collector,
+                          TaskCoordinator coordinator) {
+        messageCount.inc();
+      }
+    }
+
+Samza currently supports two kind of metrics: 
[counters](../api/javadocs/org/apache/samza/metrics/Counter.html) and 
[gauges](../api/javadocs/org/apache/samza/metrics/Gauge.html). Use a counter 
when you want to track how often something occurs, and a gauge when you want to 
report the level of something, such as the size of a buffer. Each task instance 
(for each input stream partition) gets its own set of metrics.
+
+If you want to report metrics in some other way, e.g. directly to a graphing 
system (without going via Kafka), you can implement a 
[MetricsReporterFactory](../api/javadocs/org/apache/samza/metrics/MetricsReporterFactory.html)
 and reference it in your job configuration.
 
 ## [Windowing &raquo;](windowing.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/samza-container.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/samza-container.md 
b/docs/learn/documentation/0.7.0/container/samza-container.md
new file mode 100644
index 0000000..5d259c4
--- /dev/null
+++ b/docs/learn/documentation/0.7.0/container/samza-container.md
@@ -0,0 +1,66 @@
+---
+layout: page
+title: SamzaContainer
+---
+
+The SamzaContainer is responsible for managing the startup, execution, and 
shutdown of one or more [StreamTask](../api/overview.html) instances. Each 
SamzaContainer typically runs as an indepentent Java virtual machine. A Samza 
job can consist of several SamzaContainers, potentially running on different 
machines.
+
+When a SamzaContainer starts up, it does the following:
+
+1. Get last checkpointed offset for each input stream partition that it 
consumes
+2. Create a "reader" thread for every input stream partition that it consumes
+3. Start metrics reporters to report metrics
+4. Start a checkpoint timer to save your task's input stream offsets every so 
often
+5. Start a window timer to trigger your task's [window 
method](../api/javadocs/org/apache/samza/task/WindowableTask.html), if it is 
defined
+6. Instantiate and initialize your StreamTask once for each input stream 
partition
+7. Start an event loop that takes messages from the input stream reader 
threads, and gives them to your StreamTasks
+8. Notify lifecycle listeners during each one of these steps
+
+Let's start in the middle, with the instantiation of a StreamTask. The 
following sections of the documentation cover the other steps.
+
+### Tasks and Partitions
+
+When the container starts, it creates instances of the [task 
class](../api/overview.html) that you've written. If the task class implements 
the [InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html) 
interface, the SamzaContainer will also call the init() method.
+
+    /** Implement this if you want a callback when your task starts up. */
+    public interface InitableTask {
+      void init(Config config, TaskContext context);
+    }
+
+How many instances of your task class are created depends on the number of 
partitions in the job's input streams. If your Samza job has ten partitions, 
there will be ten instantiations of your task class: one for each partition. 
The first task instance will receive all messages for partition one, the second 
instance will receive all messages for partition two, and so on.
+
+<img src="/img/0.7.0/learn/documentation/container/tasks-and-partitions.svg" 
alt="Illustration of tasks consuming partitions" class="diagram-large">
+
+The number of partitions in the input streams is determined by the systems 
from which you are consuming. For example, if your input system is Kafka, you 
can specify the number of partitions when you create a topic.
+
+If a Samza job has more than one input stream, the number of task instances 
for the Samza job is the maximum number of partitions across all input streams. 
For example, if a Samza job is reading from PageViewEvent (12 partitions), and 
ServiceMetricEvent (14 partitions), then the Samza job would have 14 task 
instances (numbered 0 through 13). Task instances 12 and 13 only receive events 
from ServiceMetricEvent, because there is no corresponding PageViewEvent 
partition.
+
+There is [work underway](https://issues.apache.org/jira/browse/SAMZA-71) to 
make the assignment of partitions to tasks more flexible in future versions of 
Samza.
+
+### Containers and resource allocation
+
+Although the number of task instances is fixed &mdash; determined by the 
number of input partitions &mdash; you can configure how many containers you 
want to use for your job. If you are [using YARN](../jobs/yarn-jobs.html), the 
number of containers determines what CPU and memory resources are allocated to 
your job.
+
+If the data volume on your input streams is small, it might be sufficient to 
use just one SamzaContainer. In that case, Samza still creates one task 
instance per input partition, but all those tasks run within the same 
container. At the other extreme, you can create as many containers as you have 
partitions, and Samza will assign one task instance to each container.
+
+Each SamzaContainer is designed to use one CPU core, so it uses a 
[single-threaded event loop](event-loop.html) for execution. It's not advisable 
to create your own threads within a SamzaContainer. If you need more 
parallelism, please configure your job to use more containers.
+
+Any [state](state-management.html) in your job belongs to a task instance, not 
to a container. This is a key design decision for Samza's scalability: as your 
job's resource requirements grow and shrink, you can simply increase or 
decrease the number of containers, but the number of task instances remains 
unchanged. As you scale up or down, the same state remains attached to each 
task instance. Task instances may be moved from one container to another, and 
any persistent state managed by Samza will be moved with it. This allows the 
job's processing semantics to remain unchanged, even as you change the job's 
parallelism.
+
+### Joining multiple input streams
+
+If your job has multiple input streams, Samza provides a simple but powerful 
mechanism for joining data from different streams: each task instance receives 
messages from one partition of *each* of the input streams. For example, say 
you have two input streams, A and B, each with four partitions. Samza creates 
four task instances to process them, and assigns the partitions as follows:
+
+<table class="documentation">
+<tr><th>Task instance</th><th>Consumes stream partitions</th></tr>
+<tr><td>0</td><td>stream A partition 0, stream B partition 0</td></tr>
+<tr><td>1</td><td>stream A partition 1, stream B partition 1</td></tr>
+<tr><td>2</td><td>stream A partition 2, stream B partition 2</td></tr>
+<tr><td>3</td><td>stream A partition 3, stream B partition 3</td></tr>
+</table>
+
+Thus, if you want two events in different streams to be processed by the same 
task instance, you need to ensure they are sent to the same partition number. 
You can achieve this by using the same partitioning key when [sending the 
messages](../api/overview.html). Joining streams is discussed in detail in the 
[state management](state-management.html) section.
+
+There is one caveat in all of this: Samza currently assumes that a stream's 
partition count will never change. Partition splitting or repartitioning is not 
supported. If an input stream has N partitions, it is expected that it has 
always had, and will always have N partitions. If you want to re-partition a 
stream, you can write a job that reads messages from the stream, and writes 
them out to a new stream with the required number of partitions. For example, 
you could read messages from PageViewEvent, and write them to 
PageViewEventRepartition.
+
+## [Streams &raquo;](streams.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/26c1e27d/docs/learn/documentation/0.7.0/container/serialization.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/serialization.md 
b/docs/learn/documentation/0.7.0/container/serialization.md
new file mode 100644
index 0000000..f6570c9
--- /dev/null
+++ b/docs/learn/documentation/0.7.0/container/serialization.md
@@ -0,0 +1,46 @@
+---
+layout: page
+title: Serialization
+---
+
+Every message that is read from or written to a [stream](streams.html) or a 
[persistent state store](state-management.html) needs to eventually be 
serialized to bytes (which are sent over the network or written to disk). There 
are various places where that serialization and deserialization can happen:
+
+1. In the client library: for example, the library for publishing to Kafka and 
consuming from Kafka supports pluggable serialization.
+2. In the task implementation: your [process method](../api/overview.html) can 
use raw byte arrays as inputs and outputs, and do any parsing and serialization 
itself.
+3. Between the two: Samza provides a layer of serializers and deserializers, 
or *serdes* for short.
+
+You can use whatever makes sense for your job; Samza doesn't impose any 
particular data model or serialization scheme on you. However, the cleanest 
solution is usually to use Samza's serde layer. The following configuration 
example shows how to use it.
+
+    # Define a system called "kafka"
+    
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+
+    # The job is going to consume a topic called "PageViewEvent" from the 
"kafka" system
+    task.inputs=kafka.PageViewEvent
+
+    # Define a serde called "json" which parses/serializes JSON objects
+    
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+    # Define a serde called "integer" which encodes an integer as 4 binary 
bytes (big-endian)
+    
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
+    # For messages in the "PageViewEvent" topic, the key (the ID of the user 
viewing the page)
+    # is encoded as a binary integer, and the message is encoded as JSON.
+    systems.kafka.streams.PageViewEvent.samza.key.serde=integer
+    systems.kafka.streams.PageViewEvent.samza.msg.serde=json
+
+    # Define a key-value store which stores the most recent page view for each 
user ID.
+    # Again, the key is an integer user ID, and the value is JSON.
+    
stores.LastPageViewPerUser.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+    stores.LastPageViewPerUser.changelog=kafka.last-page-view-per-user
+    stores.LastPageViewPerUser.key.serde=integer
+    stores.LastPageViewPerUser.msg.serde=json
+
+Each serde is defined with a factory class. Samza comes with several builtin 
serdes for UTF-8 strings, binary-encoded integers, JSON (requires the 
samza-serializers dependency) and more. You can also create your own serializer 
by implementing the 
[SerdeFactory](../api/javadocs/org/apache/samza/serializers/SerdeFactory.html) 
interface.
+
+The name you give to a serde (such as "json" and "integer" in the example 
above) is only for convenience in your job configuration; you can choose 
whatever name you like. For each stream and each state store, you can use the 
serde name to declare how messages should be serialized and deserialized.
+
+If you don't declare a serde, Samza simply passes objects through between your 
task instance and the system stream. In that case your task needs to send and 
receive whatever type of object the underlying client library uses.
+
+All the Samza APIs for sending and receiving messages are typed as *Object*. 
This means that you have to cast messages to the correct type before you can 
use them. It's a little bit more code, but it has the advantage that Samza is 
not restricted to any particular data model.
+
+## [Checkpointing &raquo;](checkpointing.html)

Reply via email to