Repository: incubator-samza Updated Branches: refs/heads/master 87ce08e02 -> 9b7e451c8
SAMZA-413: Correct and expand BlockingEnvelopeMap javadoc Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/9b7e451c Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/9b7e451c Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/9b7e451c Branch: refs/heads/master Commit: 9b7e451c82e9f6721c671f17a06250d0138194a1 Parents: 87ce08e Author: Jakob Homan <[email protected]> Authored: Mon Sep 15 11:54:31 2014 -0700 Committer: Jakob Homan <[email protected]> Committed: Mon Sep 15 11:54:31 2014 -0700 ---------------------------------------------------------------------- .../apache/samza/util/BlockingEnvelopeMap.java | 41 +++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/9b7e451c/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index 317e073..e30321d 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -49,13 +49,16 @@ import org.apache.samza.system.SystemStreamPartition; * * <p> * SystemConsumers that implement BlockingEnvelopeMap need to add messages using - * add (or addAll), and update noMoreMessage using setIsAtHead. The - * noMoreMessage variable is used to determine whether a SystemStreamPartition - * is "caught up" (has read all possible messages from the underlying system). - * For example, with a Kafka system, noMoreMessages would be set to true when - * the last message offset returned is equal to the offset high watermark for a - * given topic/partition. + * {@link #put(org.apache.samza.system.SystemStreamPartition, org.apache.samza.system.IncomingMessageEnvelope) put} + * (or {@link #putAll(org.apache.samza.system.SystemStreamPartition, java.util.List) putAll}), + * and update noMoreMessage using setIsAtHead. The noMoreMessage variable is used + * to determine whether a SystemStreamPartition is "caught up" (has read all + * possible messages from the underlying system). For example, with a Kafka + * system, noMoreMessages would be set to true when the last message offset + * returned is equal to the offset high watermark for a given topic/partition. * </p> + * The BlockingEnvelopeMap is backed by a concurrent map, which allows concurrent + * put or putAll calls to be thread safe without external synchronization. */ public abstract class BlockingEnvelopeMap implements SystemConsumer { private final BlockingEnvelopeMapMetrics metrics; @@ -91,6 +94,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { this.clock = clock; } + /** + * {@inheritDoc} + */ public void register(SystemStreamPartition systemStreamPartition, String offset) { metrics.initMetrics(systemStreamPartition); bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue()); @@ -100,6 +106,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { return new LinkedBlockingQueue<IncomingMessageEnvelope>(); } + /** + * {@inheritDoc} + */ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { long stopTime = clock.currentTimeMillis() + timeout; Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>(); @@ -147,10 +156,30 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { return messagesToReturn; } + /** + * Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the + * queue for the specified {@link org.apache.samza.system.SystemStreamPartition}. + * + * @param systemStreamPartition SystemStreamPartition that owns the envelope + * @param envelope Message for specified SystemStreamPartition + * @throws InterruptedException from underlying concurrent collection + */ protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException { bufferedMessages.get(systemStreamPartition).put(envelope); } + /** + * Place a collection of {@link org.apache.samza.system.IncomingMessageEnvelope} + * on the queue for the specified {@link org.apache.samza.system.SystemStreamPartition}. + * <p> + * Insertion of all the messages into the queue is not guaranteed to be done + * atomically. + * </p> + * + * @param systemStreamPartition SystemStreamPartition that owns the envelope + * @param envelopes Messages for specified SystemStreamPartition + * @throws InterruptedException from underlying concurrent collection + */ protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException { BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
