Updated Branches:
  refs/heads/master 86492c430 -> 5264cbb0f

DOCS; adding better SystemConsumer docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5264cbb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5264cbb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5264cbb0

Branch: refs/heads/master
Commit: 5264cbb0f7a4c9bcbf613878f2148bbabb0ae486
Parents: 86492c4
Author: Chris Riccomini <[email protected]>
Authored: Thu Jan 30 10:24:00 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Jan 30 10:24:00 2014 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/SystemConsumer.java | 141 ++++++++++++++++++-
 1 file changed, 135 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5264cbb0/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java 
b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
index 811a9f4..cf95996 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemConsumer.java
@@ -23,23 +23,152 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Used as a standard interface for all consumers of messages from specific 
Samza stream partitions.
+ * <p>
+ * SystemConsumer is the interface that must be implemented by any system that
+ * wishes to integrate with Samza. Examples of systems that one might want to
+ * integrate would be systems like Kafka, Hadoop, Kestrel, SQS, etc.
+ * </p>
+ * 
+ * <p>
+ * SamzaContainer uses SystemConsumer to read messages from the underlying
+ * system, and funnels the messages to the appropriate StreamTask instances. 
The
+ * basic flow is for the SamzaContainer to poll for all SystemStreamPartitions,
+ * feed all IncomingMessageEnvelopes to the appropriate StreamTask, and then
+ * repeat. If no IncomingMessageEnvelopes are returned, the SamzaContainer 
polls
+ * again, but with a timeout of 10ms.
+ * </p>
+ * 
+ * <p>
+ * The SamzaContainer treats SystemConsumer in the following way:
+ * </p>
+ * 
+ * <ul>
+ * <li>Start will be called before stop.</li>
+ * <li>Register will be called one or more times before start.</li>
+ * <li>Register won't be called twice for the same SystemStreamPartition.</li>
+ * <li>If timeout < 0, poll will block unless all SystemStreamPartition are at
+ * "head" (the underlying system has been checked, and returned an empty set).
+ * If at head, an empty list is returned.</li>
+ * <li>If timeout >= 0, poll will return any messages that are currently
+ * available for any of the SystemStreamPartitions specified. If no new 
messages
+ * are available, it will wait up to timeout milliseconds for messages from any
+ * SystemStreamPartition to become available. It will return an empty list if
+ * the timeout is hit, and no new messages are available.</li>
+ * <li>Nothing will be called after stop has been invoked.</li>
+ * <li>Poll will only be called for registered SystemStreamPartition.</li>
+ * <li>The SystemConsumer can't assume that a given SystemStreamPartition's
+ * messages will ever be read. It shouldn't run out of memory or deadlock all
+ * new message arrivals if one SystemStreamPartition is never read from.</li>
+ * <li>Any exception thrown by the SystemConsumer means that the SamzaContainer
+ * should halt.</li>
+ * </ul>
+ * 
+ * <p>
+ * There are generally three implementation styles to this interface:
+ * </p>
+ * 
+ * <ol>
+ * <li>Thread-based</li>
+ * <li>Selector-based</li>
+ * <li>Synchronous</li>
+ * </ol>
+ * 
+ * <p>
+ * Thread-based implementations typically use a series of threads to read from
+ * an underlying system asynchronously, and put the resulting messages into a
+ * queue, which is then read from whenever the poll method is invoked. The poll
+ * method's parameters map very closely to Java's BlockingQueue interface.
+ * BlockingEnvelopeMap is a helper class that makes it easy to implement
+ * thread-based implementations of SystemConsumer.
+ * </p>
+ * 
+ * <p>
+ * Selector-based implementations typically setup NIO-based non-blocking socket
+ * that can be selected for new data when poll is called.
+ * </p>
+ * 
+ * <p>
+ * Synchronous implementations simply fetch directly from the underlying system
+ * whenever poll is invoked. Synchronous implementations must take great care 
to
+ * adhere to the timeout rules defined in the poll method.
+ * </p>
  */
 public interface SystemConsumer {
+  /**
+   * A constant that can be used in the poll method's timeout parameter to
+   * denote that the poll invocation should block until at least one message is
+   * available for one of the SystemStreamPartitions supplied, or until all
+   * SystemStreamPartitions supplied are at head (have no new messages 
available
+   * since the last poll invocation was made for each SystemStreamPartition).
+   */
   public static int BLOCK_ON_OUTSTANDING_MESSAGES = -1;
 
+  /**
+   * Tells the SystemConsumer to connect to the underlying system, and prepare
+   * to begin serving messages when poll is invoked.
+   */
   void start();
 
+  /**
+   * Tells the SystemConsumer to close all connections, release all resource,
+   * and shut down everything. The SystemConsumer will not be used again after
+   * stop is called.
+   */
   void stop();
 
   /**
-   * Registers this consumer to receive messages from a specific offset in a 
Samza stream partition.
-   * @param systemStreamPartition The SystemStreamPartition object 
representing the Samza stream partition to receive
-   * messages from.
-   * @param lastReadOffset String representing the offset from which to start 
receiving messages from in the specified
-   * partition.
+   * Register a SystemStreamPartition to this SystemConsumer. The 
SystemConsumer
+   * should try and read messages from all SystemStreamPartitions that are
+   * registered to it. SystemStreamPartitions should only be registered before
+   * start is called.
+   * 
+   * @param systemStreamPartition
+   *          The SystemStreamPartition object representing the Samza
+   *          SystemStreamPartition to receive messages from.
+   * @param lastReadOffset
+   *          String representing the offset of the last message that was
+   *          successfully processed by the SamzaContainer before the
+   *          SamzaContainer was shut down. This allows the SystemConsumer to
+   *          pick up where it left off in cases where the SamzaContainer 
fails,
+   *          or is shut down and restarted. The lastReadOffset is saved and
+   *          restored by the CheckpointManager whenever SamzaContainer shuts
+   *          down and starts up.
    */
   void register(SystemStreamPartition systemStreamPartition, String 
lastReadOffset);
 
+  /**
+   * Poll the SystemConsumer to get any available messages from the underlying
+   * system.
+   * 
+   * If the underlying implementation does not take care to adhere to the
+   * timeout parameter, the SamzaContainer's performance will suffer
+   * drastically. Specifically, if poll blocks when it's not supposed to, it
+   * will block the entire main thread in SamzaContainer, and no messages will
+   * be processed while blocking is occurring.
+   * 
+   * @param systemStreamPartitions
+   *          A map from SystemStreamPartition to maximum number of messages to
+   *          return for the SystemStreamPartition. Polling with {stream1: 100,
+   *          stream2: 1000} tells the SystemConsumer that it can return 
between
+   *          0 and 100 messages (inclusive) for stream1, and between 0 and 
1000
+   *          messages for stream2. If SystemConsumer has messages available 
for
+   *          other registered SystemStreamPartitions, but they are not in the
+   *          systemStreamPartitions map in a given poll invocation, they can't
+   *          be returned. It is illegal to pass in SystemStreamPartitions that
+   *          have not been registered with the SystemConsumer first.
+   * @param timeout
+   *          If timeout < 0, poll will block unless all SystemStreamPartition
+   *          are at "head" (the underlying system has been checked, and
+   *          returned an empty set). If at head, an empty list is returned. If
+   *          timeout >= 0, poll will return any messages that are currently
+   *          available for any of the SystemStreamPartitions specified. If no
+   *          new messages are available, it will wait up to timeout
+   *          milliseconds for messages from any SystemStreamPartition to 
become
+   *          available. It will return an empty list if the timeout is hit, 
and
+   *          no new messages are available.
+   * @return A list of zero or more IncomingMessageEnvelopes for the
+   *         SystemStreamPartitions that were supplied during the invocation.
+   * @throws InterruptedException
+   */
   List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> 
systemStreamPartitions, long timeout) throws InterruptedException;
 }

Reply via email to