APEXMALHAR-2093 Removed usages of Idempotent Storage Manager

Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5305cd3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5305cd3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5305cd3a

Branch: refs/heads/master
Commit: 5305cd3a008d8dc337a7ef0ed030290ec9b4bea5
Parents: cf3bb7d
Author: Chandni Singh <[email protected]>
Authored: Fri May 13 17:19:37 2016 -0700
Committer: Chandni Singh <[email protected]>
Committed: Wed May 18 15:37:20 2016 -0700

----------------------------------------------------------------------
 .../contrib/avro/AvroFileInputOperator.java     |   4 +-
 .../kafka/AbstractKafkaInputOperator.java       |  51 +--
 .../kinesis/AbstractKinesisInputOperator.java   |  44 +--
 .../rabbitmq/AbstractRabbitMQInputOperator.java |  27 +-
 .../AbstractRabbitMQOutputOperator.java         |  21 +-
 .../redis/AbstractRedisInputOperator.java       |  34 +-
 .../contrib/kafka/KafkaInputOperatorTest.java   |  10 +-
 .../kinesis/KinesisInputOperatorTest.java       |   4 +-
 .../nifi/NiFiSinglePortInputOperatorTest.java   |   3 +-
 .../nifi/NiFiSinglePortOutputOperatorTest.java  |   3 +-
 .../rabbitmq/RabbitMQInputOperatorTest.java     |  11 +-
 .../rabbitmq/RabbitMQOutputOperatorTest.java    |   5 +-
 .../contrib/redis/RedisInputOperatorTest.java   |  11 +-
 .../malhar/kafka/KafkaInputOperatorTest.java    |   4 +-
 .../lib/io/fs/AbstractFileInputOperator.java    |  38 +--
 .../com/datatorrent/lib/io/fs/FileSplitter.java |  33 +-
 .../lib/io/fs/FileSplitterInput.java            |  34 +-
 .../lib/io/jms/AbstractJMSInputOperator.java    |  39 +--
 .../managed/IncrementalCheckpointManager.java   |   4 +-
 .../malhar/lib/wal/FSWindowDataManager.java     | 339 +++++++++++++++++++
 .../apex/malhar/lib/wal/WindowDataManager.java  | 312 -----------------
 .../io/fs/AbstractFileInputOperatorTest.java    |  24 +-
 .../lib/io/fs/FileSplitterInputTest.java        |  23 +-
 .../datatorrent/lib/io/fs/FileSplitterTest.java |  20 +-
 .../lib/io/jms/JMSStringInputOperatorTest.java  |   4 +-
 .../malhar/lib/wal/FSWindowDataManagerTest.java |   4 +-
 pom.xml                                         |   3 +
 27 files changed, 574 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
index 14dfdf2..989f859 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
@@ -36,7 +37,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
 
 /**
@@ -48,7 +48,7 @@ import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
  * No need to provide schema,its inferred from the file<br>
  * This operator emits a GenericRecord based on the schema derived from the
  * input file<br>
- * Users can add the {@link 
IdempotentStorageManager.FSIdempotentStorageManager}
+ * Users can add the {@link FSWindowDataManager}
  * to ensure exactly once semantics with a HDFS backed WAL.
  * 
  * @displayName AvroFileInputOperator

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index b166b9e..b026d16 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -28,7 +28,6 @@ import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 import com.google.common.base.Joiner;
@@ -44,6 +43,8 @@ import kafka.javaapi.PartitionMetadata;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
+
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -137,7 +138,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   private long maxTotalMsgSizePerWindow = Long.MAX_VALUE;
   private transient int emitCount = 0;
   private transient long emitTotalMsgSize = 0;
-  protected IdempotentStorageManager idempotentStorageManager;
+  protected WindowDataManager windowDataManager;
   protected transient long currentWindowId;
   protected transient int operatorId;
   protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> 
currentWindowRecoveryState;
@@ -192,7 +193,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
 
   public AbstractKafkaInputOperator()
   {
-    idempotentStorageManager = new 
IdempotentStorageManager.NoopIdempotentStorageManager();
+    windowDataManager = new WindowDataManager.NoopWindowDataManager();
     currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, 
Integer>>();
   }
 
@@ -244,16 +245,16 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
     }
     this.context = context;
     operatorId = context.getId();
-    if(consumer instanceof HighlevelKafkaConsumer && 
!(idempotentStorageManager instanceof 
IdempotentStorageManager.NoopIdempotentStorageManager)) {
+    if(consumer instanceof HighlevelKafkaConsumer && !(windowDataManager 
instanceof WindowDataManager.NoopWindowDataManager)) {
       throw new RuntimeException("Idempotency is not supported for High Level 
Kafka Consumer");
     }
-    idempotentStorageManager.setup(context);
+    windowDataManager.setup(context);
   }
 
   @Override
   public void teardown()
   {
-    idempotentStorageManager.teardown();
+    windowDataManager.teardown();
     consumer.teardown();
   }
 
@@ -261,7 +262,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
     emitCount = 0;
@@ -272,7 +273,8 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   {
     try {
       @SuppressWarnings("unchecked")
-      Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = 
(Map<KafkaPartition, MutablePair<Long, Integer>>) 
idempotentStorageManager.load(operatorId, windowId);
+      Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = 
(Map<KafkaPartition, MutablePair<Long, Integer>>)
+          windowDataManager.load(operatorId, windowId);
       if (recoveredData != null) {
         Map<String, List<PartitionMetadata>> pms = 
KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, 
getConsumer().topic);
         if (pms != null) {
@@ -311,7 +313,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
           }
         }
       }
-      if(windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+      if(windowId == windowDataManager.getLargestRecoveryWindow()) {
         // Start the consumer at the largest recovery window
         SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
         // Set the offset positions to the consumer
@@ -337,9 +339,9 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
       Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats);
       offsetTrackHistory.add(Pair.of(currentWindowId, carryOn));
     }
-    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
+    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
       try {
-        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
       }
       catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
@@ -376,7 +378,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     }
 
     try {
-      idempotentStorageManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.deleteUpTo(operatorId, windowId);
     }
     catch (IOException e) {
       throw new RuntimeException("deleting state", e);
@@ -386,7 +388,8 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   @Override
   public void activate(OperatorContext ctx)
   {
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID &&
+        context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
       // If it is a replay state, don't start the consumer
       return;
     }
@@ -405,7 +408,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
       return;
     }
     int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0);
@@ -505,7 +508,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
       logger.info("Initial offsets: {} ", "{ " + Joiner.on(", 
").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
     }
 
-    Collection<IdempotentStorageManager> newManagers = Sets.newHashSet();
+    Collection<WindowDataManager> newManagers = Sets.newHashSet();
     Set<Integer> deletedOperators =  Sets.newHashSet();
 
     switch (strategy) {
@@ -537,7 +540,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
           partitions.add(createPartition(Sets.newHashSet(newPartition), null, 
newManagers));
         }
         newWaitingPartition.clear();
-        idempotentStorageManager.partitioned(newManagers, deletedOperators);
+        windowDataManager.partitioned(newManagers, deletedOperators);
         return partitions;
 
       }
@@ -580,7 +583,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
 
         logger.info("[ONE_TO_MANY]: Add operator partition for kafka 
partition(s): {} ", StringUtils.join(newWaitingPartition, ", "));
         partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), 
null, newManagers));
-        idempotentStorageManager.partitioned(newManagers, deletedOperators);
+        windowDataManager.partitioned(newManagers, deletedOperators);
         return partitions;
       }
       else {
@@ -631,12 +634,12 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
       break;
     }
 
-    idempotentStorageManager.partitioned(newManagers, deletedOperators);
+    windowDataManager.partitioned(newManagers, deletedOperators);
     return newPartitions;
   }
 
   // Create a new partition with the partition Ids and initial offset positions
-  protected Partitioner.Partition<AbstractKafkaInputOperator<K>> 
createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> 
initOffsets, Collection<IdempotentStorageManager> newManagers)
+  protected Partitioner.Partition<AbstractKafkaInputOperator<K>> 
createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> 
initOffsets, Collection<WindowDataManager> newManagers)
   {
 
     Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new 
DefaultPartition<>(KryoCloneUtils.cloneObject(this));
@@ -648,7 +651,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
         
p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets());
       }
     }
-    newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
+    newManagers.add(p.getPartitionedInstance().windowDataManager);
 
     PartitionInfo pif = new PartitionInfo();
     pif.kpids = pIds;
@@ -929,14 +932,14 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
     long byteRateLeft;
   }
 
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return idempotentStorageManager;
+    return windowDataManager;
   }
 
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager)
+  public void setWindowDataManager(WindowDataManager windowDataManager)
   {
-    this.idempotentStorageManager = idempotentStorageManager;
+    this.windowDataManager = windowDataManager;
   }
 
   public void setInitialPartitionCount(int partitionCount)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 6306b04..2352236 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -25,13 +25,14 @@ import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.*;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.common.util.Pair;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Sets;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,7 +86,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
 
   private String endPoint;
 
-  protected IdempotentStorageManager idempotentStorageManager;
+  protected WindowDataManager windowDataManager;
   protected transient long currentWindowId;
   protected transient int operatorId;
   protected final transient Map<String, KinesisPair<String, Integer>> 
currentWindowRecoveryState;
@@ -132,7 +133,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
 
   public AbstractKinesisInputOperator()
   {
-    idempotentStorageManager = new 
IdempotentStorageManager.FSIdempotentStorageManager();
+    windowDataManager = new FSWindowDataManager();
     currentWindowRecoveryState = new HashMap<String, KinesisPair<String, 
Integer>>();
   }
   /**
@@ -167,7 +168,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
 
     // Operator partitions
     List<Partition<AbstractKinesisInputOperator>> newPartitions = null;
-    Collection<IdempotentStorageManager> newManagers = Sets.newHashSet();
+    Collection<WindowDataManager> newManagers = Sets.newHashSet();
     Set<Integer> deletedOperators =  Sets.newHashSet();
 
     // initialize the shard positions
@@ -198,7 +199,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
           partitions.add(createPartition(Sets.newHashSet(pid), null, 
newManagers));
         }
         newWaitingPartition.clear();
-        idempotentStorageManager.partitioned(newManagers, deletedOperators);
+        windowDataManager.partitioned(newManagers, deletedOperators);
         return partitions;
       }
       break;
@@ -250,7 +251,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
     default:
       break;
     }
-    idempotentStorageManager.partitioned(newManagers, deletedOperators);
+    windowDataManager.partitioned(newManagers, deletedOperators);
     return newPartitions;
   }
 
@@ -370,10 +371,10 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   }
   // Create a new partition with the shardIds and initial shard positions
   private
-  Partition<AbstractKinesisInputOperator> createPartition(Set<String> 
shardIds, Map<String, String> initShardPos, 
Collection<IdempotentStorageManager> newManagers)
+  Partition<AbstractKinesisInputOperator> createPartition(Set<String> 
shardIds, Map<String, String> initShardPos, Collection<WindowDataManager> 
newManagers)
   {
     Partition<AbstractKinesisInputOperator> p = new 
DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this));
-    newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
+    newManagers.add(p.getPartitionedInstance().windowDataManager);
     p.getPartitionedInstance().getConsumer().setShardIds(shardIds);
     p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos);
 
@@ -400,9 +401,9 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
     }
     consumer.create();
     operatorId = context.getId();
-    idempotentStorageManager.setup(context);
+    windowDataManager.setup(context);
     shardPosition.clear();
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
       isReplayState = true;
     }
   }
@@ -413,7 +414,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   @Override
   public void teardown()
   {
-    idempotentStorageManager.teardown();
+    windowDataManager.teardown();
     consumer.teardown();
   }
 
@@ -425,7 +426,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   {
     emitCount = 0;
     currentWindowId = windowId;
-    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -434,7 +435,8 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   {
     try {
       @SuppressWarnings("unchecked")
-      Map<String, KinesisPair<String, Integer>> recoveredData = (Map<String, 
KinesisPair<String, Integer>>) idempotentStorageManager.load(operatorId, 
windowId);
+      Map<String, KinesisPair<String, Integer>> recoveredData =
+          (Map<String, KinesisPair<String, 
Integer>>)windowDataManager.load(operatorId, windowId);
       if (recoveredData == null) {
         return;
       }
@@ -464,10 +466,10 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   @Override
   public void endWindow()
   {
-    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
+    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
       context.setCounters(getConsumer().getConsumerStats(shardPosition));
       try {
-        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
       }
       catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
@@ -495,7 +497,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   public void committed(long windowId)
   {
     try {
-      idempotentStorageManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.deleteUpTo(operatorId, windowId);
     }
     catch (IOException e) {
       throw new RuntimeException("deleting state", e);
@@ -521,7 +523,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
       return;
     }
     int count = consumer.getQueueSize();
@@ -707,13 +709,13 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
     this.endPoint = endPoint;
   }
 
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return idempotentStorageManager;
+    return windowDataManager;
   }
 
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager)
+  public void setWindowDataManager(WindowDataManager windowDataManager)
   {
-    this.idempotentStorageManager = idempotentStorageManager;
+    this.windowDataManager = windowDataManager;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index bcd9195..b842698 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -20,7 +20,6 @@ package com.datatorrent.contrib.rabbitmq;
 
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.rabbitmq.client.*;
@@ -38,6 +37,8 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
 /**
  * This is the base implementation of a RabbitMQ input operator.&nbsp;
  * Subclasses should implement the methods which convert RabbitMQ messages to 
tuples.
@@ -102,7 +103,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   protected transient String cTag;
   
   protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> 
holdingBuffer;
-  private IdempotentStorageManager idempotentStorageManager;
+  private WindowDataManager windowDataManager;
   protected final transient Map<Long, byte[]> currentWindowRecoveryState;
   private transient final Set<Long> pendingAck;
   private transient final Set<Long> recoveredTags;
@@ -114,7 +115,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     currentWindowRecoveryState = new HashMap<Long, byte[]>();
     pendingAck = new HashSet<Long>();
     recoveredTags = new HashSet<Long>();
-    idempotentStorageManager = new 
IdempotentStorageManager.NoopIdempotentStorageManager();
+    windowDataManager = new WindowDataManager.NoopWindowDataManager();
   }
 
   
@@ -189,7 +190,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -198,7 +199,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   private void replay(long windowId) {      
     Map<Long, byte[]> recoveredData;
     try {
-      recoveredData = (Map<Long, byte[]>) 
this.idempotentStorageManager.load(operatorContextId, windowId);
+      recoveredData = (Map<Long, byte[]>) 
this.windowDataManager.load(operatorContextId, windowId);
       if (recoveredData == null) {
         return;
       }
@@ -224,7 +225,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     }
     
     try {
-      this.idempotentStorageManager.save(currentWindowRecoveryState, 
operatorContextId, currentWindowId);
+      this.windowDataManager.save(currentWindowRecoveryState, 
operatorContextId, currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
@@ -247,13 +248,13 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   {
     this.operatorContextId = context.getId();
     holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, 
byte[]>>(bufferSize);
-    this.idempotentStorageManager.setup(context);
+    this.windowDataManager.setup(context);
   }
 
   @Override
   public void teardown()
   {
-    this.idempotentStorageManager.teardown();
+    this.windowDataManager.teardown();
   }
 
   @Override
@@ -319,7 +320,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   public void committed(long windowId)
   {
     try {
-      idempotentStorageManager.deleteUpTo(operatorContextId, windowId);
+      windowDataManager.deleteUpTo(operatorContextId, windowId);
     }
     catch (IOException e) {
       throw new RuntimeException("committing", e);
@@ -391,12 +392,12 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     this.routingKey = routingKey;
   }
   
-  public IdempotentStorageManager getIdempotentStorageManager() {
-    return idempotentStorageManager;
+  public WindowDataManager getWindowDataManager() {
+    return windowDataManager;
   }
   
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager) {
-    this.idempotentStorageManager = idempotentStorageManager;
+  public void setWindowDataManager(WindowDataManager windowDataManager) {
+    this.windowDataManager = windowDataManager;
   }
   
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index 8d04154..6043c5b 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -19,7 +19,6 @@
 package com.datatorrent.contrib.rabbitmq;
 
 import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.api.Context.OperatorContext;
 import com.rabbitmq.client.Channel;
@@ -32,6 +31,8 @@ import java.io.IOException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
 /**
  * This is the base implementation of a RabbitMQ output operator.&nbsp;
  * A concrete operator should be created from this skeleton implementation.
@@ -74,7 +75,7 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
   transient String exchange = "testEx";
   transient String queueName="testQ";
   
-  private IdempotentStorageManager idempotentStorageManager;  
+  private WindowDataManager windowDataManager;
   private transient long currentWindowId;
   private transient long largestRecoveryWindowId;
   private transient int operatorContextId;
@@ -95,7 +96,7 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
       channel = connection.createChannel();
       channel.exchangeDeclare(exchange, "fanout");
 
-      this.idempotentStorageManager.setup(context);
+      this.windowDataManager.setup(context);
 
     }
     catch (IOException ex) {
@@ -108,7 +109,7 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;    
-    largestRecoveryWindowId = 
idempotentStorageManager.getLargestRecoveryWindow();
+    largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow();
     if (windowId <= largestRecoveryWindowId) {
       // Do not resend already sent tuples
       skipProcessingTuple = true;
@@ -131,7 +132,7 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
       return;
     }
     try {
-      idempotentStorageManager.save("processedWindow", operatorContextId, 
currentWindowId);
+      windowDataManager.save("processedWindow", operatorContextId, 
currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
@@ -151,19 +152,19 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
     try {
       channel.close();
       connection.close();
-      this.idempotentStorageManager.teardown();
+      this.windowDataManager.teardown();
     }
     catch (IOException ex) {
       logger.debug(ex.toString());
     }
   }
   
-  public IdempotentStorageManager getIdempotentStorageManager() {
-    return idempotentStorageManager;
+  public WindowDataManager getWindowDataManager() {
+    return windowDataManager;
   }
   
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager) {    
-    this.idempotentStorageManager = idempotentStorageManager;    
+  public void setWindowDataManager(WindowDataManager windowDataManager) {
+    this.windowDataManager = windowDataManager;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index daca1fc..fd0a885 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -25,6 +25,9 @@ import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
@@ -32,7 +35,6 @@ import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 
 /**
  * This is the base implementation of a Redis input operator.
@@ -57,7 +59,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
   private transient boolean skipOffsetRecovery = true;
 
   @NotNull
-  private IdempotentStorageManager idempotentStorageManager;
+  private WindowDataManager windowDataManager;
 
   private transient OperatorContext context;
   private transient long currentWindowId;
@@ -83,7 +85,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
     recoveryState = new RecoveryState();
     recoveryState.scanOffsetAtBeginWindow = 0;
     recoveryState.numberOfScanCallsInWindow = 0;
-    setIdempotentStorageManager(new 
IdempotentStorageManager.NoopIdempotentStorageManager());
+    setWindowDataManager(new FSWindowDataManager());
   }
 
   @Override
@@ -92,7 +94,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
     currentWindowId = windowId;
     scanCallsInCurrentWindow = 0;
     replay = false;
-    if (currentWindowId <= 
getIdempotentStorageManager().getLargestRecoveryWindow()) {
+    if (currentWindowId <= getWindowDataManager().getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -105,11 +107,11 @@ public abstract class AbstractRedisInputOperator<T> 
extends AbstractKeyValueStor
       if (!skipOffsetRecovery) {
         // Begin offset for this window is recovery offset stored for the last
         // window
-        RecoveryState recoveryStateForLastWindow = (RecoveryState) 
getIdempotentStorageManager().load(context.getId(), windowId - 1);
+        RecoveryState recoveryStateForLastWindow = (RecoveryState) 
getWindowDataManager().load(context.getId(), windowId - 1);
         recoveryState.scanOffsetAtBeginWindow = 
recoveryStateForLastWindow.scanOffsetAtBeginWindow;
       }
       skipOffsetRecovery = false;
-      RecoveryState recoveryStateForCurrentWindow = (RecoveryState) 
getIdempotentStorageManager().load(context.getId(), windowId);
+      RecoveryState recoveryStateForCurrentWindow = (RecoveryState) 
getWindowDataManager().load(context.getId(), windowId);
       recoveryState.numberOfScanCallsInWindow = 
recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
       if (recoveryState.scanOffsetAtBeginWindow != null) {
         scanOffset = recoveryState.scanOffsetAtBeginWindow;
@@ -153,7 +155,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
   {
     super.setup(context);
     sleepTimeMillis = context.getValue(context.SPIN_MILLIS);
-    getIdempotentStorageManager().setup(context);
+    getWindowDataManager().setup(context);
     this.context = context;
     scanOffset = 0;
     scanComplete = false;
@@ -161,7 +163,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
     scanParameters.count(scanCount);
     
     // For the 1st window after checkpoint, windowID - 1 would not have 
recovery
-    // offset stored in idempotentStorageManager
+    // offset stored in windowDataManager
     // But recoveryOffset is non-transient, so will be recovered with
     // checkPointing
     // Offset recovery from idempotency storage can be skipped in this case
@@ -181,9 +183,9 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
     recoveryState.scanOffsetAtBeginWindow = scanOffset;
     recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow;
 
-    if (currentWindowId > 
getIdempotentStorageManager().getLargestRecoveryWindow()) {
+    if (currentWindowId > getWindowDataManager().getLargestRecoveryWindow()) {
       try {
-        getIdempotentStorageManager().save(recoveryState, context.getId(), 
currentWindowId);
+        getWindowDataManager().save(recoveryState, context.getId(), 
currentWindowId);
       } catch (IOException e) {
         DTThrowable.rethrow(e);
       }
@@ -194,7 +196,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
   public void teardown()
   {
     super.teardown();
-    getIdempotentStorageManager().teardown();
+    getWindowDataManager().teardown();
   }
 
   /*
@@ -231,7 +233,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
   public void committed(long windowId)
   {
     try {
-      getIdempotentStorageManager().deleteUpTo(context.getId(), windowId);
+      getWindowDataManager().deleteUpTo(context.getId(), windowId);
     } catch (IOException e) {
       throw new RuntimeException("committing", e);
     }
@@ -240,16 +242,16 @@ public abstract class AbstractRedisInputOperator<T> 
extends AbstractKeyValueStor
   /*
    * get Idempotent Storage manager instance
    */
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return idempotentStorageManager;
+    return windowDataManager;
   }
 
   /*
    * set Idempotent storage manager instance
    */
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager)
+  public void setWindowDataManager(WindowDataManager windowDataManager)
   {
-    this.idempotentStorageManager = idempotentStorageManager;
+    this.windowDataManager = windowDataManager;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 9db1355..27235f5 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
@@ -49,7 +50,6 @@ import com.datatorrent.api.Partitioner;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.stram.StramLocalCluster;
@@ -141,7 +141,7 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     }
 
     if(idempotent) {
-      node.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+      node.setWindowDataManager(new FSWindowDataManager());
     }
     consumer.setTopic(TEST_TOPIC);
 
@@ -304,7 +304,7 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     operator.deactivate();
 
     operator = createAndDeployOperator();
-    Assert.assertEquals("largest recovery window", 2, 
operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+    Assert.assertEquals("largest recovery window", 2, 
operator.getWindowDataManager().getLargestRecoveryWindow());
 
     operator.beginWindow(1);
     operator.emitTuples();
@@ -339,9 +339,9 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     consumer.setTopic(TEST_TOPIC);
     consumer.setInitialOffset("earliest");
 
-    IdempotentStorageManager.FSIdempotentStorageManager storageManager = new 
IdempotentStorageManager.FSIdempotentStorageManager();
+    FSWindowDataManager storageManager = new FSWindowDataManager();
     storageManager.setRecoveryPath(testMeta.recoveryDir);
-    testMeta.operator.setIdempotentStorageManager(storageManager);
+    testMeta.operator.setWindowDataManager(storageManager);
     testMeta.operator.setConsumer(consumer);
     testMeta.operator.setZookeeper("localhost:" + 
KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
     testMeta.operator.setMaxTuplesPerWindow(500);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
index 8051b96..e8eff5d 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
@@ -232,11 +232,11 @@ public class KinesisInputOperatorTest extends 
KinesisOperatorTestBase
     testMeta.operator.setup(testMeta.context);
     testMeta.operator.activate(testMeta.context);
 
-    Assert.assertEquals("largest recovery window", 1, 
testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+    Assert.assertEquals("largest recovery window", 1, 
testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
 
     testMeta.operator.beginWindow(1);
     testMeta.operator.endWindow();
     Assert.assertEquals("num of messages in window 1", 10, 
testMeta.sink.collectedTuples.size());
     testMeta.sink.collectedTuples.clear();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
index f36f1f2..dc56e1c 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.util.file.FileUtils;
@@ -71,7 +72,7 @@ public class NiFiSinglePortInputOperatorTest
 
     sink = new CollectorTestSink<>();
     builder = new MockSiteToSiteClient.Builder();
-    windowDataManager = new WindowDataManager.FSWindowDataManager();
+    windowDataManager = new FSWindowDataManager();
 
     operator = new NiFiSinglePortInputOperator(builder, windowDataManager);
     operator.outputPort.setSink(sink);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
index e8aa982..14b1493 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.remote.protocol.DataPacket;
@@ -65,7 +66,7 @@ public class NiFiSinglePortOutputOperatorTest
 
     context = new OperatorContextTestHelper.TestIdOperatorContext(12345, 
attributeMap);
 
-    windowDataManager = new WindowDataManager.FSWindowDataManager();
+    windowDataManager = new FSWindowDataManager();
 
     stsBuilder = new MockSiteToSiteClient.Builder();
     dpBuilder = new StringNiFiDataPacketBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 315160b..4fccffa 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -33,6 +33,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
 import com.datatorrent.contrib.helper.CollectorModule;
 import com.datatorrent.contrib.helper.MessageQueueTestHelper;
 import com.datatorrent.api.Context.OperatorContext;
@@ -41,7 +43,6 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -127,7 +128,7 @@ public class RabbitMQInputOperatorTest
     LocalMode lma = LocalMode.newInstance();
     DAG dag = lma.getDAG();
     RabbitMQInputOperator consumer = dag.addOperator("Consumer", 
RabbitMQInputOperator.class);
-    consumer.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+    consumer.setWindowDataManager(new FSWindowDataManager());
 
     final CollectorModule<byte[]> collector = dag.addOperator("Collector", new 
CollectorModule<byte[]>());
 
@@ -188,7 +189,7 @@ public class RabbitMQInputOperatorTest
   public void testRecoveryAndIdempotency() throws Exception
   {
     RabbitMQInputOperator operator = new RabbitMQInputOperator();
-    operator.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+    operator.setWindowDataManager(new FSWindowDataManager());
     operator.setHost("localhost");
     operator.setExchange("testEx");
     operator.setExchangeType("fanout");
@@ -220,7 +221,7 @@ public class RabbitMQInputOperatorTest
     operator.setup(context);
     operator.activate(context);
 
-    Assert.assertEquals("largest recovery window", 1, 
operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+    Assert.assertEquals("largest recovery window", 1, 
operator.getWindowDataManager().getLargestRecoveryWindow());
     operator.beginWindow(1);
     operator.endWindow();
     Assert.assertEquals("num of messages in window 1", 15, 
sink.collectedTuples.size());
@@ -228,7 +229,7 @@ public class RabbitMQInputOperatorTest
 
     operator.deactivate();
     operator.teardown();
-    operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1);
+    operator.getWindowDataManager().deleteUpTo(context.getId(), 1);
     publisher.teardown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index 596a33a..3fa36ab 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -29,8 +29,9 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
 import com.datatorrent.contrib.helper.SourceModule;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
@@ -144,7 +145,7 @@ public class RabbitMQOutputOperatorTest
     SourceModule source = dag.addOperator("source", new SourceModule());
     source.setTestNum(testNum);
     RabbitMQOutputOperator collector = dag.addOperator("generator", new 
RabbitMQOutputOperator());
-    collector.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+    collector.setWindowDataManager(new FSWindowDataManager());
 
     collector.setExchange("testEx");
     dag.addStream("Stream", source.outPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
index 6f3a177..bee170d 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -25,6 +25,8 @@ import java.util.List;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+
 import redis.clients.jedis.ScanParams;
 
 import com.datatorrent.api.Attribute;
@@ -34,7 +36,6 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.KeyValPair;
 
@@ -134,7 +135,7 @@ public class RedisInputOperatorTest
     testStore.put("test_ghi", "123");
 
     RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator();
-    operator.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+    operator.setWindowDataManager(new FSWindowDataManager());
     
     operator.setStore(operatorStore);
     operator.setScanCount(1);
@@ -162,13 +163,13 @@ public class RedisInputOperatorTest
       // failure and then re-deployment of operator
       // Re-instantiating to reset values
       operator = new RedisKeyValueInputOperator();
-      operator.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+      operator.setWindowDataManager(new FSWindowDataManager());
       operator.setStore(operatorStore);
       operator.setScanCount(1);
       operator.outputPort.setSink(sink);
       operator.setup(context);
 
-      Assert.assertEquals("largest recovery window", 2, 
operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+      Assert.assertEquals("largest recovery window", 2, 
operator.getWindowDataManager().getLargestRecoveryWindow());
 
       operator.beginWindow(1);
       operator.emitTuples();
@@ -188,7 +189,7 @@ public class RedisInputOperatorTest
         testStore.remove(entry.getKey());
       }
       sink.collectedTuples.clear();
-      operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5);
+      operator.getWindowDataManager().deleteUpTo(context.getId(), 5);
       operator.teardown();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index ede7f38..72ecd57 100644
--- 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -40,7 +40,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.commons.io.FileUtils;
 
 import com.datatorrent.api.Context;
@@ -255,7 +255,7 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     node.setClusters(getClusterConfig());
     node.setStrategy(partition);
     if(idempotent) {
-      node.setWindowDataManager(new WindowDataManager.FSWindowDataManager());
+      node.setWindowDataManager(new FSWindowDataManager());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index bf1605d..dae5c7f 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -63,7 +64,6 @@ import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
 
 import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
 /**
@@ -127,7 +127,7 @@ public abstract class AbstractFileInputOperator<T>
   protected transient MutableLong pendingFileCount = new MutableLong();
 
   @NotNull
-  protected IdempotentStorageManager idempotentStorageManager = new 
IdempotentStorageManager.NoopIdempotentStorageManager();
+  private WindowDataManager windowDataManager = new 
WindowDataManager.NoopWindowDataManager();
   protected transient long currentWindowId;
   protected final transient LinkedList<RecoveryEntry> 
currentWindowRecoveryState = Lists.newLinkedList();
   protected int operatorId; //needed in partitioning
@@ -388,11 +388,11 @@ public abstract class AbstractFileInputOperator<T>
 
   /**
    * Sets the idempotent storage manager on the operator.
-   * @param idempotentStorageManager  an {@link IdempotentStorageManager}
+   * @param windowDataManager  an {@link WindowDataManager}
    */
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager)
+  public void setWindowDataManager(WindowDataManager windowDataManager)
   {
-    this.idempotentStorageManager = idempotentStorageManager;
+    this.windowDataManager = windowDataManager;
   }
 
   /**
@@ -400,9 +400,9 @@ public abstract class AbstractFileInputOperator<T>
    *
    * @return the idempotent storage manager.
    */
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return idempotentStorageManager;
+    return windowDataManager;
   }
 
   /**
@@ -456,8 +456,8 @@ public abstract class AbstractFileInputOperator<T>
     fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, 
localNumberOfRetries);
     fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount);
 
-    idempotentStorageManager.setup(context);
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    windowDataManager.setup(context);
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
       //reset current file and offset in case of replay
       currentFile = null;
       offset = 0;
@@ -512,14 +512,14 @@ public abstract class AbstractFileInputOperator<T>
 
       throw new RuntimeException(errorMessage, savedException);
     }
-    idempotentStorageManager.teardown();
+    windowDataManager.teardown();
   }
 
   @Override
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -527,9 +527,9 @@ public abstract class AbstractFileInputOperator<T>
   @Override
   public void endWindow()
   {
-    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
+    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
       try {
-        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
       } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
@@ -553,7 +553,7 @@ public abstract class AbstractFileInputOperator<T>
     //all the recovery data for a window and then processes only those files 
which would be hashed
     //to it in the current run.
     try {
-      Map<Integer, Object> recoveryDataPerOperator = 
idempotentStorageManager.load(windowId);
+      Map<Integer, Object> recoveryDataPerOperator = 
windowDataManager.load(windowId);
 
       for (Object recovery : recoveryDataPerOperator.values()) {
         @SuppressWarnings("unchecked")
@@ -615,7 +615,7 @@ public abstract class AbstractFileInputOperator<T>
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
       return;
     }
 
@@ -836,7 +836,7 @@ public abstract class AbstractFileInputOperator<T>
     List<DirectoryScanner> scanners = scanner.partition(totalCount, 
oldscanners);
 
     Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = 
Lists.newArrayListWithExpectedSize(totalCount);
-    Collection<IdempotentStorageManager> newManagers = 
Lists.newArrayListWithExpectedSize(totalCount);
+    Collection<WindowDataManager> newManagers = 
Lists.newArrayListWithExpectedSize(totalCount);
 
     KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
     for (int i = 0; i < scanners.size(); i++) {
@@ -889,10 +889,10 @@ public abstract class AbstractFileInputOperator<T>
         }
       }
       newPartitions.add(new 
DefaultPartition<AbstractFileInputOperator<T>>(oper));
-      newManagers.add(oper.idempotentStorageManager);
+      newManagers.add(oper.windowDataManager);
     }
 
-    idempotentStorageManager.partitioned(newManagers, deletedOperators);
+    windowDataManager.partitioned(newManagers, deletedOperators);
     LOG.info("definePartitions called returning {} partitions", 
newPartitions.size());
     return newPartitions;
   }
@@ -917,7 +917,7 @@ public abstract class AbstractFileInputOperator<T>
   public void committed(long windowId)
   {
     try {
-      idempotentStorageManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.deleteUpTo(operatorId, windowId);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index 69e44a5..cd79a2b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -40,6 +40,7 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -61,7 +62,6 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata;
 import com.datatorrent.netlet.util.DTThrowable;
 
@@ -99,7 +99,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   protected TimeBasedDirectoryScanner scanner;
 
   @NotNull
-  protected IdempotentStorageManager idempotentStorageManager;
+  protected WindowDataManager windowDataManager;
 
   @NotNull
   protected final transient LinkedList<FileInfo> currentWindowRecoveryState;
@@ -118,7 +118,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   {
     currentWindowRecoveryState = Lists.newLinkedList();
     fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
-    idempotentStorageManager = new 
IdempotentStorageManager.FSIdempotentStorageManager();
+    windowDataManager = new WindowDataManager.NoopWindowDataManager();
     scanner = new TimeBasedDirectoryScanner();
     blocksThreshold = Integer.MAX_VALUE;
   }
@@ -133,7 +133,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
     this.context = context;
 
     fileCounters.setCounter(Counters.PROCESSED_FILES, new MutableLong());
-    idempotentStorageManager.setup(context);
+    windowDataManager.setup(context);
 
     try {
       fs = scanner.getFSInstance();
@@ -145,8 +145,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
       blockSize = fs.getDefaultBlockSize(new 
Path(scanner.files.iterator().next()));
     }
 
-    if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) <
-        idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
       blockMetadataIterator = null;
     } else {
       //don't setup scanner while recovery
@@ -176,7 +175,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   {
     blockCount = 0;
     currentWindowId = windowId;
-    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -185,7 +184,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   {
     try {
       @SuppressWarnings("unchecked")
-      LinkedList<FileInfo> recoveredData = 
(LinkedList<FileInfo>)idempotentStorageManager.load(operatorId, windowId);
+      LinkedList<FileInfo> recoveredData = 
(LinkedList<FileInfo>)windowDataManager.load(operatorId, windowId);
       if (recoveredData == null) {
         //This could happen when there are multiple physical instances and one 
of them is ahead in processing windows.
         return;
@@ -210,7 +209,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
         }
       }
 
-      if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+      if (windowId == windowDataManager.getLargestRecoveryWindow()) {
         scanner.setup(context);
       }
     } catch (IOException e) {
@@ -221,7 +220,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
       return;
     }
 
@@ -260,9 +259,9 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   @Override
   public void endWindow()
   {
-    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
+    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
       try {
-        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
       } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
@@ -370,14 +369,14 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
     return this.scanner;
   }
 
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager)
+  public void setWindowDataManager(WindowDataManager windowDataManager)
   {
-    this.idempotentStorageManager = idempotentStorageManager;
+    this.windowDataManager = windowDataManager;
   }
 
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return this.idempotentStorageManager;
+    return this.windowDataManager;
   }
 
   @Override
@@ -389,7 +388,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   public void committed(long l)
   {
     try {
-      idempotentStorageManager.deleteUpTo(operatorId, l);
+      windowDataManager.deleteUpTo(operatorId, l);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index bd68016..a58bee7 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -42,6 +42,8 @@ import javax.validation.constraints.Size;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -60,7 +62,6 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -79,7 +80,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 public class FileSplitterInput extends AbstractFileSplitter implements 
InputOperator, Operator.CheckpointListener
 {
   @NotNull
-  private IdempotentStorageManager idempotentStorageManager;
+  private WindowDataManager windowDataManager;
   @NotNull
   protected final transient LinkedList<ScannedFileInfo> 
currentWindowRecoveryState;
 
@@ -95,7 +96,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   {
     super();
     currentWindowRecoveryState = Lists.newLinkedList();
-    idempotentStorageManager = new 
IdempotentStorageManager.NoopIdempotentStorageManager();
+    windowDataManager = new WindowDataManager.NoopWindowDataManager();
     referenceTimes = Maps.newHashMap();
     scanner = new TimeBasedDirectoryScanner();
   }
@@ -105,10 +106,10 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
   {
     sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
     scanner.setup(context);
-    idempotentStorageManager.setup(context);
+    windowDataManager.setup(context);
     super.setup(context);
 
-    long largestRecoveryWindow = 
idempotentStorageManager.getLargestRecoveryWindow();
+    long largestRecoveryWindow = windowDataManager.getLargestRecoveryWindow();
     if (largestRecoveryWindow == Stateless.WINDOW_ID || 
context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) >
         largestRecoveryWindow) {
       scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
@@ -119,7 +120,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   public void beginWindow(long windowId)
   {
     super.beginWindow(windowId);
-    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -128,8 +129,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   {
     try {
       @SuppressWarnings("unchecked")
-      LinkedList<ScannedFileInfo> recoveredData = 
(LinkedList<ScannedFileInfo>)idempotentStorageManager
-          .load(operatorId, windowId);
+      LinkedList<ScannedFileInfo> recoveredData = 
(LinkedList<ScannedFileInfo>)windowDataManager.load(operatorId, windowId);
       if (recoveredData == null) {
         //This could happen when there are multiple physical instances and one 
of them is ahead in processing windows.
         return;
@@ -150,7 +150,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
     } catch (IOException e) {
       throw new RuntimeException("replay", e);
     }
-    if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId == windowDataManager.getLargestRecoveryWindow()) {
       scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
     }
   }
@@ -158,7 +158,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
       return;
     }
 
@@ -204,9 +204,9 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   @Override
   public void endWindow()
   {
-    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
+    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
       try {
-        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
       } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
@@ -235,7 +235,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   public void committed(long l)
   {
     try {
-      idempotentStorageManager.deleteUpTo(operatorId, l);
+      windowDataManager.deleteUpTo(operatorId, l);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -247,14 +247,14 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
     scanner.teardown();
   }
 
-  public void setIdempotentStorageManager(IdempotentStorageManager 
idempotentStorageManager)
+  public void setWindowDataManager(WindowDataManager windowDataManager)
   {
-    this.idempotentStorageManager = idempotentStorageManager;
+    this.windowDataManager = windowDataManager;
   }
 
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return this.idempotentStorageManager;
+    return this.windowDataManager;
   }
 
   public void setScanner(TimeBasedDirectoryScanner scanner)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index 43445a7..cc27c88 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -39,6 +39,8 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang.mutable.MutableLong;
 
 import com.google.common.collect.Maps;
@@ -52,7 +54,6 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -63,8 +64,8 @@ import com.datatorrent.netlet.util.DTThrowable;
  * {@link #onMessage(Message)} is called which buffers the message into a 
holding buffer. This is asynchronous.<br/>
  * {@link #emitTuples()} retrieves messages from holding buffer and processes 
them.
  * <p/>
- * Important: The {@link IdempotentStorageManager.FSIdempotentStorageManager} 
makes the operator fault tolerant as
- * well as idempotent. If {@link 
IdempotentStorageManager.NoopIdempotentStorageManager} is set on the operator 
then
+ * Important: The {@link FSWindowDataManager} makes the operator fault 
tolerant as
+ * well as idempotent. If {@link WindowDataManager.NoopWindowDataManager} is 
set on the operator then
  * it will not be fault-tolerant as well.
  * <p/>
  * Configurations:<br/>
@@ -105,7 +106,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   private final transient AtomicReference<Throwable> throwable;
 
   @NotNull
-  protected IdempotentStorageManager idempotentStorageManager;
+  protected WindowDataManager windowDataManager;
   private transient long[] operatorRecoveredWindows;
   protected transient long currentWindowId;
   protected transient int emitCount;
@@ -120,7 +121,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
     counters = new BasicCounters<MutableLong>(MutableLong.class);
     throwable = new AtomicReference<Throwable>();
     pendingAck = Sets.newHashSet();
-    idempotentStorageManager = new 
IdempotentStorageManager.FSIdempotentStorageManager();
+    windowDataManager = new FSWindowDataManager();
 
     lock = new Lock();
 
@@ -200,9 +201,9 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
     spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
     counters.setCounter(CounterKeys.RECEIVED, new MutableLong());
     counters.setCounter(CounterKeys.REDELIVERED, new MutableLong());
-    idempotentStorageManager.setup(context);
+    windowDataManager.setup(context);
     try {
-      operatorRecoveredWindows = 
idempotentStorageManager.getWindowIds(context.getId());
+      operatorRecoveredWindows = 
windowDataManager.getWindowIds(context.getId());
       if (operatorRecoveredWindows != null) {
         Arrays.sort(operatorRecoveredWindows);
       }
@@ -261,7 +262,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
       replay(windowId);
     }
   }
@@ -270,7 +271,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   {
     try {
       @SuppressWarnings("unchecked")
-      Map<String, T> recoveredData = (Map<String, 
T>)idempotentStorageManager.load(context.getId(), windowId);
+      Map<String, T> recoveredData = (Map<String, 
T>)windowDataManager.load(context.getId(), windowId);
       if (recoveredData == null) {
         return;
       }
@@ -286,7 +287,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= 
idempotentStorageManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
       return;
     }
 
@@ -346,7 +347,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   @Override
   public void endWindow()
   {
-    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
+    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
       synchronized (lock) {
         boolean stateSaved = false;
         boolean ackCompleted = false;
@@ -359,7 +360,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
             emitCount++;
             lastMsg = msg;
           }
-          idempotentStorageManager.save(currentWindowRecoveryState, 
context.getId(), currentWindowId);
+          windowDataManager.save(currentWindowRecoveryState, context.getId(), 
currentWindowId);
           stateSaved = true;
 
           currentWindowRecoveryState.clear();
@@ -376,7 +377,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
         } finally {
           if (stateSaved && !ackCompleted) {
             try {
-              idempotentStorageManager.delete(context.getId(), 
currentWindowId);
+              windowDataManager.delete(context.getId(), currentWindowId);
             } catch (IOException e) {
               LOG.error("unable to delete corrupted state", e);
             }
@@ -416,7 +417,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   public void committed(long windowId)
   {
     try {
-      idempotentStorageManager.deleteUpTo(context.getId(), windowId);
+      windowDataManager.deleteUpTo(context.getId(), windowId);
     } catch (IOException e) {
       throw new RuntimeException("committing", e);
     }
@@ -447,7 +448,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   @Override
   public void teardown()
   {
-    idempotentStorageManager.teardown();
+    windowDataManager.teardown();
   }
 
   /**
@@ -500,17 +501,17 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
    *
    * @param storageManager
    */
-  public void setIdempotentStorageManager(IdempotentStorageManager 
storageManager)
+  public void setWindowDataManager(WindowDataManager storageManager)
   {
-    this.idempotentStorageManager = storageManager;
+    this.windowDataManager = storageManager;
   }
 
   /**
    * @return the idempotent storage manager.
    */
-  public IdempotentStorageManager getIdempotentStorageManager()
+  public WindowDataManager getWindowDataManager()
   {
-    return this.idempotentStorageManager;
+    return this.windowDataManager;
   }
 
   protected abstract void emit(T payload);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5305cd3a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 536702d..7858e89 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -32,7 +32,7 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -49,7 +49,7 @@ import com.datatorrent.netlet.util.Slice;
  *
  * This component is also responsible for purging old time buckets.
  */
-public class IncrementalCheckpointManager extends 
WindowDataManager.FSWindowDataManager
+public class IncrementalCheckpointManager extends FSWindowDataManager
     implements ManagedStateComponent
 {
   private static final String WAL_RELATIVE_PATH = "managed_state";

Reply via email to