Repository: incubator-samza
Updated Branches:
  refs/heads/master 87ce08e02 -> 9b7e451c8


SAMZA-413: Correct and expand BlockingEnvelopeMap javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9b7e451c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9b7e451c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9b7e451c

Branch: refs/heads/master
Commit: 9b7e451c82e9f6721c671f17a06250d0138194a1
Parents: 87ce08e
Author: Jakob Homan <[email protected]>
Authored: Mon Sep 15 11:54:31 2014 -0700
Committer: Jakob Homan <[email protected]>
Committed: Mon Sep 15 11:54:31 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/util/BlockingEnvelopeMap.java  | 41 +++++++++++++++++---
 1 file changed, 35 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b7e451c/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 317e073..e30321d 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -49,13 +49,16 @@ import org.apache.samza.system.SystemStreamPartition;
  * 
  * <p>
  * SystemConsumers that implement BlockingEnvelopeMap need to add messages 
using
- * add (or addAll), and update noMoreMessage using setIsAtHead. The
- * noMoreMessage variable is used to determine whether a SystemStreamPartition
- * is "caught up" (has read all possible messages from the underlying system).
- * For example, with a Kafka system, noMoreMessages would be set to true when
- * the last message offset returned is equal to the offset high watermark for a
- * given topic/partition.
+ * {@link #put(org.apache.samza.system.SystemStreamPartition, 
org.apache.samza.system.IncomingMessageEnvelope) put}
+ * (or {@link #putAll(org.apache.samza.system.SystemStreamPartition, 
java.util.List) putAll}),
+ * and update noMoreMessage using setIsAtHead. The noMoreMessage variable is 
used
+ * to determine whether a SystemStreamPartition is "caught up" (has read all
+ * possible messages from the underlying system). For example, with a Kafka
+ * system, noMoreMessages would be set to true when the last message offset
+ * returned is equal to the offset high watermark for a given topic/partition.
  * </p>
+ * The BlockingEnvelopeMap is backed by a concurrent map, which allows 
concurrent
+ * put or putAll calls to be thread safe without external synchronization.
  */
 public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final BlockingEnvelopeMapMetrics metrics;
@@ -91,6 +94,9 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     this.clock = clock;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
     metrics.initMetrics(systemStreamPartition);
     bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
@@ -100,6 +106,9 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     return new LinkedBlockingQueue<IncomingMessageEnvelope>();
   }
 
+  /**
+   * {@inheritDoc}
+   */
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws 
InterruptedException {
     long stopTime = clock.currentTimeMillis() + timeout;
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn 
= new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
@@ -147,10 +156,30 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     return messagesToReturn;
   }
 
+  /**
+   * Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the
+   * queue for the specified {@link 
org.apache.samza.system.SystemStreamPartition}.
+   *
+   * @param systemStreamPartition SystemStreamPartition that owns the envelope
+   * @param envelope Message for specified SystemStreamPartition
+   * @throws InterruptedException from underlying concurrent collection
+   */
   protected void put(SystemStreamPartition systemStreamPartition, 
IncomingMessageEnvelope envelope) throws InterruptedException {
     bufferedMessages.get(systemStreamPartition).put(envelope);
   }
 
+  /**
+   * Place a collection of {@link 
org.apache.samza.system.IncomingMessageEnvelope}
+   * on the queue for the specified {@link 
org.apache.samza.system.SystemStreamPartition}.
+   * <p>
+   * Insertion of all the messages into the queue is not guaranteed to be done
+   * atomically.
+   * </p>
+   *
+   * @param systemStreamPartition SystemStreamPartition that owns the envelope
+   * @param envelopes Messages for specified SystemStreamPartition
+   * @throws InterruptedException from underlying concurrent collection
+   */
   protected void putAll(SystemStreamPartition systemStreamPartition, 
List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
     BlockingQueue<IncomingMessageEnvelope> queue = 
bufferedMessages.get(systemStreamPartition);
 

Reply via email to