APEXMALHAR-2093 Removed usages of Idempotent Storage Manager
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5305cd3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5305cd3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5305cd3a Branch: refs/heads/master Commit: 5305cd3a008d8dc337a7ef0ed030290ec9b4bea5 Parents: cf3bb7d Author: Chandni Singh <[email protected]> Authored: Fri May 13 17:19:37 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Wed May 18 15:37:20 2016 -0700 ---------------------------------------------------------------------- .../contrib/avro/AvroFileInputOperator.java | 4 +- .../kafka/AbstractKafkaInputOperator.java | 51 +-- .../kinesis/AbstractKinesisInputOperator.java | 44 +-- .../rabbitmq/AbstractRabbitMQInputOperator.java | 27 +- .../AbstractRabbitMQOutputOperator.java | 21 +- .../redis/AbstractRedisInputOperator.java | 34 +- .../contrib/kafka/KafkaInputOperatorTest.java | 10 +- .../kinesis/KinesisInputOperatorTest.java | 4 +- .../nifi/NiFiSinglePortInputOperatorTest.java | 3 +- .../nifi/NiFiSinglePortOutputOperatorTest.java | 3 +- .../rabbitmq/RabbitMQInputOperatorTest.java | 11 +- .../rabbitmq/RabbitMQOutputOperatorTest.java | 5 +- .../contrib/redis/RedisInputOperatorTest.java | 11 +- .../malhar/kafka/KafkaInputOperatorTest.java | 4 +- .../lib/io/fs/AbstractFileInputOperator.java | 38 +-- .../com/datatorrent/lib/io/fs/FileSplitter.java | 33 +- .../lib/io/fs/FileSplitterInput.java | 34 +- .../lib/io/jms/AbstractJMSInputOperator.java | 39 +-- .../managed/IncrementalCheckpointManager.java | 4 +- .../malhar/lib/wal/FSWindowDataManager.java | 339 +++++++++++++++++++ .../apex/malhar/lib/wal/WindowDataManager.java | 312 ----------------- .../io/fs/AbstractFileInputOperatorTest.java | 24 +- .../lib/io/fs/FileSplitterInputTest.java | 23 +- .../datatorrent/lib/io/fs/FileSplitterTest.java | 20 +- .../lib/io/jms/JMSStringInputOperatorTest.java | 4 +- .../malhar/lib/wal/FSWindowDataManagerTest.java | 4 +- pom.xml | 3 + 27 files changed, 574 insertions(+), 535 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java index 14dfdf2..989f859 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java @@ -24,6 +24,7 @@ import java.io.InputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.avro.AvroRuntimeException; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -36,7 +37,6 @@ import com.google.common.annotations.VisibleForTesting; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** @@ -48,7 +48,7 @@ import com.datatorrent.lib.io.fs.AbstractFileInputOperator; * No need to provide schema,its inferred from the file<br> * This operator emits a GenericRecord based on the schema derived from the * input file<br> - * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager} + * Users can add the {@link FSWindowDataManager} * to ensure exactly once semantics with a HDFS backed WAL. * * @displayName AvroFileInputOperator http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index b166b9e..b026d16 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -28,7 +28,6 @@ import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.base.Joiner; @@ -44,6 +43,8 @@ import kafka.javaapi.PartitionMetadata; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.Message; import kafka.message.MessageAndOffset; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -137,7 +138,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem private long maxTotalMsgSizePerWindow = Long.MAX_VALUE; private transient int emitCount = 0; private transient long emitTotalMsgSize = 0; - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; protected transient long currentWindowId; protected transient int operatorId; protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState; @@ -192,7 +193,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem public AbstractKafkaInputOperator() { - idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, Integer>>(); } @@ -244,16 +245,16 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } this.context = context; operatorId = context.getId(); - if(consumer instanceof HighlevelKafkaConsumer && !(idempotentStorageManager instanceof IdempotentStorageManager.NoopIdempotentStorageManager)) { + if(consumer instanceof HighlevelKafkaConsumer && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager)) { throw new RuntimeException("Idempotency is not supported for High Level Kafka Consumer"); } - idempotentStorageManager.setup(context); + windowDataManager.setup(context); } @Override public void teardown() { - idempotentStorageManager.teardown(); + windowDataManager.teardown(); consumer.teardown(); } @@ -261,7 +262,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } emitCount = 0; @@ -272,7 +273,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem { try { @SuppressWarnings("unchecked") - Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>) idempotentStorageManager.load(operatorId, windowId); + Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>) + windowDataManager.load(operatorId, windowId); if (recoveredData != null) { Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic); if (pms != null) { @@ -311,7 +313,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } } } - if(windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + if(windowId == windowDataManager.getLargestRecoveryWindow()) { // Start the consumer at the largest recovery window SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer(); // Set the offset positions to the consumer @@ -337,9 +339,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats); offsetTrackHistory.add(Pair.of(currentWindowId, carryOn)); } - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -376,7 +378,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem } try { - idempotentStorageManager.deleteUpTo(operatorId, windowId); + windowDataManager.deleteUpTo(operatorId, windowId); } catch (IOException e) { throw new RuntimeException("deleting state", e); @@ -386,7 +388,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem @Override public void activate(OperatorContext ctx) { - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && + context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { // If it is a replay state, don't start the consumer return; } @@ -405,7 +408,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0); @@ -505,7 +508,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }"); } - Collection<IdempotentStorageManager> newManagers = Sets.newHashSet(); + Collection<WindowDataManager> newManagers = Sets.newHashSet(); Set<Integer> deletedOperators = Sets.newHashSet(); switch (strategy) { @@ -537,7 +540,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers)); } newWaitingPartition.clear(); - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } @@ -580,7 +583,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", ")); partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers)); - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } else { @@ -631,12 +634,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem break; } - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return newPartitions; } // Create a new partition with the partition Ids and initial offset positions - protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers) + protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<WindowDataManager> newManagers) { Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this)); @@ -648,7 +651,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets()); } } - newManagers.add(p.getPartitionedInstance().idempotentStorageManager); + newManagers.add(p.getPartitionedInstance().windowDataManager); PartitionInfo pif = new PartitionInfo(); pif.kpids = pIds; @@ -929,14 +932,14 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem long byteRateLeft; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } public void setInitialPartitionCount(int partitionCount) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index 6306b04..2352236 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -25,13 +25,14 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.*; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.common.util.Pair; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; import com.esotericsoftware.kryo.DefaultSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Sets; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, private String endPoint; - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; protected transient long currentWindowId; protected transient int operatorId; protected final transient Map<String, KinesisPair<String, Integer>> currentWindowRecoveryState; @@ -132,7 +133,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, public AbstractKinesisInputOperator() { - idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + windowDataManager = new FSWindowDataManager(); currentWindowRecoveryState = new HashMap<String, KinesisPair<String, Integer>>(); } /** @@ -167,7 +168,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, // Operator partitions List<Partition<AbstractKinesisInputOperator>> newPartitions = null; - Collection<IdempotentStorageManager> newManagers = Sets.newHashSet(); + Collection<WindowDataManager> newManagers = Sets.newHashSet(); Set<Integer> deletedOperators = Sets.newHashSet(); // initialize the shard positions @@ -198,7 +199,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, partitions.add(createPartition(Sets.newHashSet(pid), null, newManagers)); } newWaitingPartition.clear(); - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } break; @@ -250,7 +251,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, default: break; } - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return newPartitions; } @@ -370,10 +371,10 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, } // Create a new partition with the shardIds and initial shard positions private - Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos, Collection<IdempotentStorageManager> newManagers) + Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos, Collection<WindowDataManager> newManagers) { Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this)); - newManagers.add(p.getPartitionedInstance().idempotentStorageManager); + newManagers.add(p.getPartitionedInstance().windowDataManager); p.getPartitionedInstance().getConsumer().setShardIds(shardIds); p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos); @@ -400,9 +401,9 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, } consumer.create(); operatorId = context.getId(); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); shardPosition.clear(); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { isReplayState = true; } } @@ -413,7 +414,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, @Override public void teardown() { - idempotentStorageManager.teardown(); + windowDataManager.teardown(); consumer.teardown(); } @@ -425,7 +426,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, { emitCount = 0; currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -434,7 +435,8 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, { try { @SuppressWarnings("unchecked") - Map<String, KinesisPair<String, Integer>> recoveredData = (Map<String, KinesisPair<String, Integer>>) idempotentStorageManager.load(operatorId, windowId); + Map<String, KinesisPair<String, Integer>> recoveredData = + (Map<String, KinesisPair<String, Integer>>)windowDataManager.load(operatorId, windowId); if (recoveredData == null) { return; } @@ -464,10 +466,10 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { context.setCounters(getConsumer().getConsumerStats(shardPosition)); try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -495,7 +497,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(operatorId, windowId); + windowDataManager.deleteUpTo(operatorId, windowId); } catch (IOException e) { throw new RuntimeException("deleting state", e); @@ -521,7 +523,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } int count = consumer.getQueueSize(); @@ -707,13 +709,13 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator, this.endPoint = endPoint; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index bcd9195..b842698 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -20,7 +20,6 @@ package com.datatorrent.contrib.rabbitmq; import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.netlet.util.DTThrowable; import com.rabbitmq.client.*; @@ -38,6 +37,8 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + /** * This is the base implementation of a RabbitMQ input operator. * Subclasses should implement the methods which convert RabbitMQ messages to tuples. @@ -102,7 +103,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements protected transient String cTag; protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer; - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; protected final transient Map<Long, byte[]> currentWindowRecoveryState; private transient final Set<Long> pendingAck; private transient final Set<Long> recoveredTags; @@ -114,7 +115,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements currentWindowRecoveryState = new HashMap<Long, byte[]>(); pendingAck = new HashSet<Long>(); recoveredTags = new HashSet<Long>(); - idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); } @@ -189,7 +190,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -198,7 +199,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements private void replay(long windowId) { Map<Long, byte[]> recoveredData; try { - recoveredData = (Map<Long, byte[]>) this.idempotentStorageManager.load(operatorContextId, windowId); + recoveredData = (Map<Long, byte[]>) this.windowDataManager.load(operatorContextId, windowId); if (recoveredData == null) { return; } @@ -224,7 +225,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements } try { - this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId); + this.windowDataManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -247,13 +248,13 @@ public abstract class AbstractRabbitMQInputOperator<T> implements { this.operatorContextId = context.getId(); holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, byte[]>>(bufferSize); - this.idempotentStorageManager.setup(context); + this.windowDataManager.setup(context); } @Override public void teardown() { - this.idempotentStorageManager.teardown(); + this.windowDataManager.teardown(); } @Override @@ -319,7 +320,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(operatorContextId, windowId); + windowDataManager.deleteUpTo(operatorContextId, windowId); } catch (IOException e) { throw new RuntimeException("committing", e); @@ -391,12 +392,12 @@ public abstract class AbstractRabbitMQInputOperator<T> implements this.routingKey = routingKey; } - public IdempotentStorageManager getIdempotentStorageManager() { - return idempotentStorageManager; + public WindowDataManager getWindowDataManager() { + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) { - this.idempotentStorageManager = idempotentStorageManager; + public void setWindowDataManager(WindowDataManager windowDataManager) { + this.windowDataManager = windowDataManager; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java index 8d04154..6043c5b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java @@ -19,7 +19,6 @@ package com.datatorrent.contrib.rabbitmq; import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.api.Context.OperatorContext; import com.rabbitmq.client.Channel; @@ -32,6 +31,8 @@ import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + /** * This is the base implementation of a RabbitMQ output operator. * A concrete operator should be created from this skeleton implementation. @@ -74,7 +75,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator transient String exchange = "testEx"; transient String queueName="testQ"; - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; private transient long currentWindowId; private transient long largestRecoveryWindowId; private transient int operatorContextId; @@ -95,7 +96,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator channel = connection.createChannel(); channel.exchangeDeclare(exchange, "fanout"); - this.idempotentStorageManager.setup(context); + this.windowDataManager.setup(context); } catch (IOException ex) { @@ -108,7 +109,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator public void beginWindow(long windowId) { currentWindowId = windowId; - largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow(); + largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow(); if (windowId <= largestRecoveryWindowId) { // Do not resend already sent tuples skipProcessingTuple = true; @@ -131,7 +132,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator return; } try { - idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId); + windowDataManager.save("processedWindow", operatorContextId, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -151,19 +152,19 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator try { channel.close(); connection.close(); - this.idempotentStorageManager.teardown(); + this.windowDataManager.teardown(); } catch (IOException ex) { logger.debug(ex.toString()); } } - public IdempotentStorageManager getIdempotentStorageManager() { - return idempotentStorageManager; + public WindowDataManager getWindowDataManager() { + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) { - this.idempotentStorageManager = idempotentStorageManager; + public void setWindowDataManager(WindowDataManager windowDataManager) { + this.windowDataManager = windowDataManager; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index daca1fc..fd0a885 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -25,6 +25,9 @@ import java.util.List; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; @@ -32,7 +35,6 @@ import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; -import com.datatorrent.lib.io.IdempotentStorageManager; /** * This is the base implementation of a Redis input operator. @@ -57,7 +59,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor private transient boolean skipOffsetRecovery = true; @NotNull - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; private transient OperatorContext context; private transient long currentWindowId; @@ -83,7 +85,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor recoveryState = new RecoveryState(); recoveryState.scanOffsetAtBeginWindow = 0; recoveryState.numberOfScanCallsInWindow = 0; - setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager()); + setWindowDataManager(new FSWindowDataManager()); } @Override @@ -92,7 +94,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor currentWindowId = windowId; scanCallsInCurrentWindow = 0; replay = false; - if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) { + if (currentWindowId <= getWindowDataManager().getLargestRecoveryWindow()) { replay(windowId); } } @@ -105,11 +107,11 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor if (!skipOffsetRecovery) { // Begin offset for this window is recovery offset stored for the last // window - RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1); + RecoveryState recoveryStateForLastWindow = (RecoveryState) getWindowDataManager().load(context.getId(), windowId - 1); recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow; } skipOffsetRecovery = false; - RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId); + RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getWindowDataManager().load(context.getId(), windowId); recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow; if (recoveryState.scanOffsetAtBeginWindow != null) { scanOffset = recoveryState.scanOffsetAtBeginWindow; @@ -153,7 +155,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor { super.setup(context); sleepTimeMillis = context.getValue(context.SPIN_MILLIS); - getIdempotentStorageManager().setup(context); + getWindowDataManager().setup(context); this.context = context; scanOffset = 0; scanComplete = false; @@ -161,7 +163,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor scanParameters.count(scanCount); // For the 1st window after checkpoint, windowID - 1 would not have recovery - // offset stored in idempotentStorageManager + // offset stored in windowDataManager // But recoveryOffset is non-transient, so will be recovered with // checkPointing // Offset recovery from idempotency storage can be skipped in this case @@ -181,9 +183,9 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor recoveryState.scanOffsetAtBeginWindow = scanOffset; recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow; - if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) { + if (currentWindowId > getWindowDataManager().getLargestRecoveryWindow()) { try { - getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId); + getWindowDataManager().save(recoveryState, context.getId(), currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -194,7 +196,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor public void teardown() { super.teardown(); - getIdempotentStorageManager().teardown(); + getWindowDataManager().teardown(); } /* @@ -231,7 +233,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor public void committed(long windowId) { try { - getIdempotentStorageManager().deleteUpTo(context.getId(), windowId); + getWindowDataManager().deleteUpTo(context.getId(), windowId); } catch (IOException e) { throw new RuntimeException("committing", e); } @@ -240,16 +242,16 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor /* * get Idempotent Storage manager instance */ - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } /* * set Idempotent storage manager instance */ - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index 9db1355..27235f5 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; @@ -49,7 +50,6 @@ import com.datatorrent.api.Partitioner; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.partitioner.StatelessPartitionerTest; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.stram.StramLocalCluster; @@ -141,7 +141,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } if(idempotent) { - node.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + node.setWindowDataManager(new FSWindowDataManager()); } consumer.setTopic(TEST_TOPIC); @@ -304,7 +304,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase operator.deactivate(); operator = createAndDeployOperator(); - Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); operator.emitTuples(); @@ -339,9 +339,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase consumer.setTopic(TEST_TOPIC); consumer.setInitialOffset("earliest"); - IdempotentStorageManager.FSIdempotentStorageManager storageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager storageManager = new FSWindowDataManager(); storageManager.setRecoveryPath(testMeta.recoveryDir); - testMeta.operator.setIdempotentStorageManager(storageManager); + testMeta.operator.setWindowDataManager(storageManager); testMeta.operator.setConsumer(consumer); testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]); testMeta.operator.setMaxTuplesPerWindow(500); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java index 8051b96..e8eff5d 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java @@ -232,11 +232,11 @@ public class KinesisInputOperatorTest extends KinesisOperatorTestBase testMeta.operator.setup(testMeta.context); testMeta.operator.activate(testMeta.context); - Assert.assertEquals("largest recovery window", 1, testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 1, testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); testMeta.operator.beginWindow(1); testMeta.operator.endWindow(); Assert.assertEquals("num of messages in window 1", 10, testMeta.sink.collectedTuples.size()); testMeta.sink.collectedTuples.clear(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java index f36f1f2..dc56e1c 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java @@ -33,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.file.FileUtils; @@ -71,7 +72,7 @@ public class NiFiSinglePortInputOperatorTest sink = new CollectorTestSink<>(); builder = new MockSiteToSiteClient.Builder(); - windowDataManager = new WindowDataManager.FSWindowDataManager(); + windowDataManager = new FSWindowDataManager(); operator = new NiFiSinglePortInputOperator(builder, windowDataManager); operator.outputPort.setSink(sink); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java index e8aa982..14b1493 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java @@ -30,6 +30,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.io.IOUtils; import org.apache.nifi.remote.protocol.DataPacket; @@ -65,7 +66,7 @@ public class NiFiSinglePortOutputOperatorTest context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap); - windowDataManager = new WindowDataManager.FSWindowDataManager(); + windowDataManager = new FSWindowDataManager(); stsBuilder = new MockSiteToSiteClient.Builder(); dpBuilder = new StringNiFiDataPacketBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java index 315160b..4fccffa 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java @@ -33,6 +33,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + import com.datatorrent.contrib.helper.CollectorModule; import com.datatorrent.contrib.helper.MessageQueueTestHelper; import com.datatorrent.api.Context.OperatorContext; @@ -41,7 +43,6 @@ import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.netlet.util.DTThrowable; @@ -127,7 +128,7 @@ public class RabbitMQInputOperatorTest LocalMode lma = LocalMode.newInstance(); DAG dag = lma.getDAG(); RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class); - consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + consumer.setWindowDataManager(new FSWindowDataManager()); final CollectorModule<byte[]> collector = dag.addOperator("Collector", new CollectorModule<byte[]>()); @@ -188,7 +189,7 @@ public class RabbitMQInputOperatorTest public void testRecoveryAndIdempotency() throws Exception { RabbitMQInputOperator operator = new RabbitMQInputOperator(); - operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setWindowDataManager(new FSWindowDataManager()); operator.setHost("localhost"); operator.setExchange("testEx"); operator.setExchangeType("fanout"); @@ -220,7 +221,7 @@ public class RabbitMQInputOperatorTest operator.setup(context); operator.activate(context); - Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 1, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); operator.endWindow(); Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size()); @@ -228,7 +229,7 @@ public class RabbitMQInputOperatorTest operator.deactivate(); operator.teardown(); - operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1); + operator.getWindowDataManager().deleteUpTo(context.getId(), 1); publisher.teardown(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java index 596a33a..3fa36ab 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java @@ -29,8 +29,9 @@ import org.junit.Assert; import org.junit.Test; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + import com.datatorrent.contrib.helper.SourceModule; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; @@ -144,7 +145,7 @@ public class RabbitMQOutputOperatorTest SourceModule source = dag.addOperator("source", new SourceModule()); source.setTestNum(testNum); RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator()); - collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + collector.setWindowDataManager(new FSWindowDataManager()); collector.setExchange("testEx"); dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java index 6f3a177..bee170d 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java @@ -25,6 +25,8 @@ import java.util.List; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + import redis.clients.jedis.ScanParams; import com.datatorrent.api.Attribute; @@ -34,7 +36,6 @@ import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.LocalMode; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.KeyValPair; @@ -134,7 +135,7 @@ public class RedisInputOperatorTest testStore.put("test_ghi", "123"); RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator(); - operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setWindowDataManager(new FSWindowDataManager()); operator.setStore(operatorStore); operator.setScanCount(1); @@ -162,13 +163,13 @@ public class RedisInputOperatorTest // failure and then re-deployment of operator // Re-instantiating to reset values operator = new RedisKeyValueInputOperator(); - operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setWindowDataManager(new FSWindowDataManager()); operator.setStore(operatorStore); operator.setScanCount(1); operator.outputPort.setSink(sink); operator.setup(context); - Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); operator.emitTuples(); @@ -188,7 +189,7 @@ public class RedisInputOperatorTest testStore.remove(entry.getKey()); } sink.collectedTuples.clear(); - operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5); + operator.getWindowDataManager().deleteUpTo(context.getId(), 5); operator.teardown(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index ede7f38..72ecd57 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -40,7 +40,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import com.datatorrent.api.Context; @@ -255,7 +255,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase node.setClusters(getClusterConfig()); node.setStrategy(partition); if(idempotent) { - node.setWindowDataManager(new WindowDataManager.FSWindowDataManager()); + node.setWindowDataManager(new FSWindowDataManager()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index bf1605d..dae5c7f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -63,7 +64,6 @@ import com.datatorrent.api.Partitioner; import com.datatorrent.api.StatsListener; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; /** @@ -127,7 +127,7 @@ public abstract class AbstractFileInputOperator<T> protected transient MutableLong pendingFileCount = new MutableLong(); @NotNull - protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); protected transient long currentWindowId; protected final transient LinkedList<RecoveryEntry> currentWindowRecoveryState = Lists.newLinkedList(); protected int operatorId; //needed in partitioning @@ -388,11 +388,11 @@ public abstract class AbstractFileInputOperator<T> /** * Sets the idempotent storage manager on the operator. - * @param idempotentStorageManager an {@link IdempotentStorageManager} + * @param windowDataManager an {@link WindowDataManager} */ - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } /** @@ -400,9 +400,9 @@ public abstract class AbstractFileInputOperator<T> * * @return the idempotent storage manager. */ - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } /** @@ -456,8 +456,8 @@ public abstract class AbstractFileInputOperator<T> fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, localNumberOfRetries); fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount); - idempotentStorageManager.setup(context); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { + windowDataManager.setup(context); + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { //reset current file and offset in case of replay currentFile = null; offset = 0; @@ -512,14 +512,14 @@ public abstract class AbstractFileInputOperator<T> throw new RuntimeException(errorMessage, savedException); } - idempotentStorageManager.teardown(); + windowDataManager.teardown(); } @Override public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -527,9 +527,9 @@ public abstract class AbstractFileInputOperator<T> @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -553,7 +553,7 @@ public abstract class AbstractFileInputOperator<T> //all the recovery data for a window and then processes only those files which would be hashed //to it in the current run. try { - Map<Integer, Object> recoveryDataPerOperator = idempotentStorageManager.load(windowId); + Map<Integer, Object> recoveryDataPerOperator = windowDataManager.load(windowId); for (Object recovery : recoveryDataPerOperator.values()) { @SuppressWarnings("unchecked") @@ -615,7 +615,7 @@ public abstract class AbstractFileInputOperator<T> @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -836,7 +836,7 @@ public abstract class AbstractFileInputOperator<T> List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners); Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount); - Collection<IdempotentStorageManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount); + Collection<WindowDataManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount); KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this); for (int i = 0; i < scanners.size(); i++) { @@ -889,10 +889,10 @@ public abstract class AbstractFileInputOperator<T> } } newPartitions.add(new DefaultPartition<AbstractFileInputOperator<T>>(oper)); - newManagers.add(oper.idempotentStorageManager); + newManagers.add(oper.windowDataManager); } - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); LOG.info("definePartitions called returning {} partitions", newPartitions.size()); return newPartitions; } @@ -917,7 +917,7 @@ public abstract class AbstractFileInputOperator<T> public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(operatorId, windowId); + windowDataManager.deleteUpTo(operatorId, windowId); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index 69e44a5..cd79a2b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -40,6 +40,7 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -61,7 +62,6 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; import com.datatorrent.netlet.util.DTThrowable; @@ -99,7 +99,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener protected TimeBasedDirectoryScanner scanner; @NotNull - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; @NotNull protected final transient LinkedList<FileInfo> currentWindowRecoveryState; @@ -118,7 +118,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener { currentWindowRecoveryState = Lists.newLinkedList(); fileCounters = new BasicCounters<MutableLong>(MutableLong.class); - idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); scanner = new TimeBasedDirectoryScanner(); blocksThreshold = Integer.MAX_VALUE; } @@ -133,7 +133,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener this.context = context; fileCounters.setCounter(Counters.PROCESSED_FILES, new MutableLong()); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); try { fs = scanner.getFSInstance(); @@ -145,8 +145,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next())); } - if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < - idempotentStorageManager.getLargestRecoveryWindow()) { + if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { blockMetadataIterator = null; } else { //don't setup scanner while recovery @@ -176,7 +175,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener { blockCount = 0; currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -185,7 +184,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener { try { @SuppressWarnings("unchecked") - LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)idempotentStorageManager.load(operatorId, windowId); + LinkedList<FileInfo> recoveredData = (LinkedList<FileInfo>)windowDataManager.load(operatorId, windowId); if (recoveredData == null) { //This could happen when there are multiple physical instances and one of them is ahead in processing windows. return; @@ -210,7 +209,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener } } - if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestRecoveryWindow()) { scanner.setup(context); } } catch (IOException e) { @@ -221,7 +220,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -260,9 +259,9 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -370,14 +369,14 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener return this.scanner; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return this.idempotentStorageManager; + return this.windowDataManager; } @Override @@ -389,7 +388,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener public void committed(long l) { try { - idempotentStorageManager.deleteUpTo(operatorId, l); + windowDataManager.deleteUpTo(operatorId, l); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index bd68016..a58bee7 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -42,6 +42,8 @@ import javax.validation.constraints.Size; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -60,7 +62,6 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; /** @@ -79,7 +80,7 @@ import com.datatorrent.netlet.util.DTThrowable; public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener { @NotNull - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; @NotNull protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState; @@ -95,7 +96,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper { super(); currentWindowRecoveryState = Lists.newLinkedList(); - idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); referenceTimes = Maps.newHashMap(); scanner = new TimeBasedDirectoryScanner(); } @@ -105,10 +106,10 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper { sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); scanner.setup(context); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); super.setup(context); - long largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow(); + long largestRecoveryWindow = windowDataManager.getLargestRecoveryWindow(); if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); @@ -119,7 +120,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper public void beginWindow(long windowId) { super.beginWindow(windowId); - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -128,8 +129,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper { try { @SuppressWarnings("unchecked") - LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)idempotentStorageManager - .load(operatorId, windowId); + LinkedList<ScannedFileInfo> recoveredData = (LinkedList<ScannedFileInfo>)windowDataManager.load(operatorId, windowId); if (recoveredData == null) { //This could happen when there are multiple physical instances and one of them is ahead in processing windows. return; @@ -150,7 +150,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper } catch (IOException e) { throw new RuntimeException("replay", e); } - if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestRecoveryWindow()) { scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); } } @@ -158,7 +158,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -204,9 +204,9 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -235,7 +235,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper public void committed(long l) { try { - idempotentStorageManager.deleteUpTo(operatorId, l); + windowDataManager.deleteUpTo(operatorId, l); } catch (IOException e) { throw new RuntimeException(e); } @@ -247,14 +247,14 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper scanner.teardown(); } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return this.idempotentStorageManager; + return this.windowDataManager; } public void setScanner(TimeBasedDirectoryScanner scanner) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index 43445a7..cc27c88 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -39,6 +39,8 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; import com.google.common.collect.Maps; @@ -52,7 +54,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; /** @@ -63,8 +64,8 @@ import com.datatorrent.netlet.util.DTThrowable; * {@link #onMessage(Message)} is called which buffers the message into a holding buffer. This is asynchronous.<br/> * {@link #emitTuples()} retrieves messages from holding buffer and processes them. * <p/> - * Important: The {@link IdempotentStorageManager.FSIdempotentStorageManager} makes the operator fault tolerant as - * well as idempotent. If {@link IdempotentStorageManager.NoopIdempotentStorageManager} is set on the operator then + * Important: The {@link FSWindowDataManager} makes the operator fault tolerant as + * well as idempotent. If {@link WindowDataManager.NoopWindowDataManager} is set on the operator then * it will not be fault-tolerant as well. * <p/> * Configurations:<br/> @@ -105,7 +106,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase private final transient AtomicReference<Throwable> throwable; @NotNull - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; private transient long[] operatorRecoveredWindows; protected transient long currentWindowId; protected transient int emitCount; @@ -120,7 +121,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase counters = new BasicCounters<MutableLong>(MutableLong.class); throwable = new AtomicReference<Throwable>(); pendingAck = Sets.newHashSet(); - idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + windowDataManager = new FSWindowDataManager(); lock = new Lock(); @@ -200,9 +201,9 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase spinMillis = context.getValue(OperatorContext.SPIN_MILLIS); counters.setCounter(CounterKeys.RECEIVED, new MutableLong()); counters.setCounter(CounterKeys.REDELIVERED, new MutableLong()); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); try { - operatorRecoveredWindows = idempotentStorageManager.getWindowIds(context.getId()); + operatorRecoveredWindows = windowDataManager.getWindowIds(context.getId()); if (operatorRecoveredWindows != null) { Arrays.sort(operatorRecoveredWindows); } @@ -261,7 +262,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -270,7 +271,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase { try { @SuppressWarnings("unchecked") - Map<String, T> recoveredData = (Map<String, T>)idempotentStorageManager.load(context.getId(), windowId); + Map<String, T> recoveredData = (Map<String, T>)windowDataManager.load(context.getId(), windowId); if (recoveredData == null) { return; } @@ -286,7 +287,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -346,7 +347,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { synchronized (lock) { boolean stateSaved = false; boolean ackCompleted = false; @@ -359,7 +360,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase emitCount++; lastMsg = msg; } - idempotentStorageManager.save(currentWindowRecoveryState, context.getId(), currentWindowId); + windowDataManager.save(currentWindowRecoveryState, context.getId(), currentWindowId); stateSaved = true; currentWindowRecoveryState.clear(); @@ -376,7 +377,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase } finally { if (stateSaved && !ackCompleted) { try { - idempotentStorageManager.delete(context.getId(), currentWindowId); + windowDataManager.delete(context.getId(), currentWindowId); } catch (IOException e) { LOG.error("unable to delete corrupted state", e); } @@ -416,7 +417,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(context.getId(), windowId); + windowDataManager.deleteUpTo(context.getId(), windowId); } catch (IOException e) { throw new RuntimeException("committing", e); } @@ -447,7 +448,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase @Override public void teardown() { - idempotentStorageManager.teardown(); + windowDataManager.teardown(); } /** @@ -500,17 +501,17 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase * * @param storageManager */ - public void setIdempotentStorageManager(IdempotentStorageManager storageManager) + public void setWindowDataManager(WindowDataManager storageManager) { - this.idempotentStorageManager = storageManager; + this.windowDataManager = storageManager; } /** * @return the idempotent storage manager. */ - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return this.idempotentStorageManager; + return this.windowDataManager; } protected abstract void emit(T payload); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index 536702d..7858e89 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -32,7 +32,7 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -49,7 +49,7 @@ import com.datatorrent.netlet.util.Slice; * * This component is also responsible for purging old time buckets. */ -public class IncrementalCheckpointManager extends WindowDataManager.FSWindowDataManager +public class IncrementalCheckpointManager extends FSWindowDataManager implements ManagedStateComponent { private static final String WAL_RELATIVE_PATH = "managed_state";
