http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
deleted file mode 100644
index cdf7452..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.FunctionException;
-import com.gemstone.gemfire.cache.execute.FunctionService;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.InternalEntity;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
-import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import 
com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
-import 
com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-public class HDFSFlushQueueFunction implements Function, InternalEntity{
-  private static final int MAX_RETRIES = 
Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3);
-  private static final boolean VERBOSE = 
Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
-  private static final Logger logger = LogService.getLogger();
-  private static final String ID = HDFSFlushQueueFunction.class.getName();
-  
-  public static void flushQueue(PartitionedRegion pr, int maxWaitTime) {
-    
-    Set<Integer> buckets = new 
HashSet<Integer>(pr.getRegionAdvisor().getBucketSet());
-
-    maxWaitTime *= 1000;
-    long start = System.currentTimeMillis();
-    
-    int retries = 0;
-    long remaining = 0;
-    while (retries++ < MAX_RETRIES && (remaining = waitTime(start, 
maxWaitTime)) > 0) {
-      if (logger.isDebugEnabled() || VERBOSE) {
-        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing 
buckets " + buckets 
-            + ", attempt = " + retries 
-            + ", remaining = " + remaining));
-      }
-      
-      HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining);
-      
-      HDFSFlushQueueResultCollector rc = new 
HDFSFlushQueueResultCollector(buckets);
-      AbstractExecution exec = (AbstractExecution) FunctionService
-          .onRegion(pr)
-          .withArgs(args)
-          .withCollector(rc);
-      exec.setWaitOnExceptionFlag(true);
-      
-      try {
-        exec.execute(ID);
-        if (rc.getResult()) {
-          if (logger.isDebugEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, 
"Flushed all buckets successfully")); 
-          }
-          return;
-        }
-      } catch (FunctionException e) {
-        if (logger.isDebugEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, 
"Encountered error flushing queue"), e); 
-        }
-      }
-      
-      buckets.removeAll(rc.getSuccessfulBuckets());
-      for (int bucketId : buckets) {
-        remaining = waitTime(start, maxWaitTime);
-        if (logger.isDebugEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting 
for bucket " + bucketId)); 
-        }
-        pr.getNodeForBucketWrite(bucketId, new 
PartitionedRegion.RetryTimeKeeper((int) remaining));
-      }
-    }
-    
-    pr.checkReadiness();
-    throw new FunctionException("Unable to flush the following buckets: " + 
buckets);
-  }
-  
-  private static long waitTime(long start, long max) {
-    if (max == 0) {
-      return Integer.MAX_VALUE;
-    }
-    return start + max - System.currentTimeMillis();
-  }
-  
-  @Override
-  public void execute(FunctionContext context) {
-    RegionFunctionContext rfc = (RegionFunctionContext) context;
-    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
-    
-    HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments();
-    Set<Integer> buckets = new HashSet<Integer>(args.getBuckets());
-    buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
-
-    Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, 
AsyncFlushResult>();
-    for (int bucketId : buckets) {
-      try {
-        HDFSBucketRegionQueue brq = getQueue(pr, bucketId);
-        if (brq != null) {
-          if (logger.isDebugEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, 
"Flushing bucket " + bucketId)); 
-          }
-          flushes.put(bucketId, brq.flush());
-        }
-      } catch (ForceReattemptException e) {
-        if (logger.isDebugEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, 
"Encountered error flushing bucket " + bucketId), e); 
-        }
-      }
-    }
-    
-    try {
-      long start = System.currentTimeMillis();
-      for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) {
-        long remaining = waitTime(start, args.getMaxWaitTime());
-        if (logger.isDebugEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting 
for bucket " + flush.getKey() 
-              + " to complete flushing, remaining = " + remaining)); 
-        }
-        
-        if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) {
-          if (logger.isDebugEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, 
"Bucket " + flush.getKey() + " flushed successfully")); 
-          }
-          rfc.getResultSender().sendResult(new FlushStatus(flush.getKey()));
-        }
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    
-    if (logger.isDebugEnabled() || VERBOSE) {
-      logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending 
final flush result")); 
-    }
-    rfc.getResultSender().lastResult(FlushStatus.last());
-  }
-
-  private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId) 
-      throws ForceReattemptException {
-    AsyncEventQueueImpl aeq = pr.getHDFSEventQueue();
-    AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender();
-    AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor();
-    if (ep == null) {
-      return null;
-    }
-    
-    ConcurrentParallelGatewaySenderQueue queue = 
(ConcurrentParallelGatewaySenderQueue) ep.getQueue();
-    return queue.getBucketRegionQueue(pr, bucketId);
-  }
-  
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  @Override
-  public boolean hasResult() {
-    return true;
-  }
-
-  @Override
-  public boolean optimizeForWrite() {
-    return true;
-  }
-
-  @Override
-  public boolean isHA() {
-    return false;
-  }
-  
-  public static class HDFSFlushQueueResultCollector implements 
LocalResultCollector<Object, Boolean> {
-    private final CountDownLatch complete;
-    private final Set<Integer> expectedBuckets;
-    private final Set<Integer> successfulBuckets;
-
-    private volatile ReplyProcessor21 processor;
-    
-    public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) {
-      this.expectedBuckets = expectedBuckets;
-      
-      complete = new CountDownLatch(1);
-      successfulBuckets = new HashSet<Integer>();
-    }
-    
-    public Set<Integer> getSuccessfulBuckets() {
-      synchronized (successfulBuckets) {
-        return new HashSet<Integer>(successfulBuckets);
-      }
-    }
-    
-    @Override
-    public Boolean getResult() throws FunctionException {
-      try {
-        complete.await();
-        synchronized (successfulBuckets) {
-          LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
-          if (logger.fineEnabled() || VERBOSE) {
-            logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + 
expectedBuckets);
-            logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + 
successfulBuckets);
-          }
-          return expectedBuckets.equals(successfulBuckets);
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        
GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
-        throw new FunctionException(e);
-      }
-    }
-
-    @Override
-    public Boolean getResult(long timeout, TimeUnit unit)
-        throws FunctionException, InterruptedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public synchronized void addResult(DistributedMember memberID, Object 
result) {
-      if (result instanceof FlushStatus) {
-        FlushStatus status = (FlushStatus) result;
-        if (!status.isLast()) {
-          synchronized (successfulBuckets) {
-            successfulBuckets.add(status.getBucketId());
-          }        
-        }
-      }
-    }
-
-    @Override
-    public void endResults() {         
-      complete.countDown();
-    }
-
-    @Override
-    public void clearResults() {
-    }
-
-    @Override
-    public void setProcessor(ReplyProcessor21 processor) {
-      this.processor = processor;
-    }
-
-    @Override
-    public ReplyProcessor21 getProcessor() {
-      return processor;
-    }
-
-       @Override
-       public void setException(Throwable exception) {
-               // TODO Auto-generated method stub
-               
-       }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
deleted file mode 100644
index ec0f9ff..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.VersionedDataSerializable;
-import com.gemstone.gemfire.internal.Version;
-
-/**
- * Arguments passed to the HDFSForceCompactionFunction
- * 
- */
-@SuppressWarnings("serial")
-public class HDFSForceCompactionArgs implements VersionedDataSerializable {
-
-  private static Version[] serializationVersions = new Version[]{ 
Version.GFE_81 };
-
-  private HashSet<Integer> buckets;
-
-  private boolean isMajor;
-
-  private int maxWaitTime;
-
-  public HDFSForceCompactionArgs() {
-  }
-
-  public HDFSForceCompactionArgs(Set<Integer> buckets, boolean isMajor, 
Integer maxWaitTime) {
-    this.buckets = new HashSet<Integer>(buckets);
-    this.isMajor = isMajor;
-    this.maxWaitTime = maxWaitTime;
-  }
-
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    DataSerializer.writeHashSet(buckets, out);
-    out.writeBoolean(isMajor);
-    out.writeInt(maxWaitTime);
-  }
-
-  @Override
-  public void fromData(DataInput in) throws IOException,
-      ClassNotFoundException {
-    this.buckets = DataSerializer.readHashSet(in);
-    this.isMajor = in.readBoolean();
-    this.maxWaitTime = in.readInt();
-  }
-
-  @Override
-  public Version[] getSerializationVersions() {
-    return serializationVersions;
-  }
-
-  public Set<Integer> getBuckets() {
-    return (Set<Integer>) buckets;
-  }
-
-  public void setBuckets(Set<Integer> buckets) {
-    this.buckets = new HashSet<Integer>(buckets);
-  }
-
-  public boolean isMajor() {
-    return isMajor;
-  }
-
-  public void setMajor(boolean isMajor) {
-    this.isMajor = isMajor;
-  }
-
-  public boolean isSynchronous() {
-    return maxWaitTime == 0;
-  }
-
-  public int getMaxWaitTime() {
-    return this.maxWaitTime;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(getClass().getCanonicalName()).append("@")
-    .append(System.identityHashCode(this))
-    .append(" buckets:").append(buckets)
-    .append(" isMajor:").append(isMajor)
-    .append(" maxWaitTime:").append(maxWaitTime);
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
deleted file mode 100644
index d26ac1b..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.internal.InternalEntity;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-/**
- * Function responsible for forcing a compaction on all members
- * of the system
- *
- */
-@SuppressWarnings("serial")
-public class HDFSForceCompactionFunction implements Function, InternalEntity {
-
-  public static final int FORCE_COMPACTION_MAX_RETRIES = 
Integer.getInteger("gemfireXD.maxCompactionRetries", 3);
-
-  public static final int BUCKET_ID_FOR_LAST_RESULT = -1;
-
-  public static final String ID = "HDFSForceCompactionFunction";
-
-  private static final Logger logger = LogService.getLogger();
-  
-  @Override
-  public void execute(FunctionContext context) {
-    if (context.isPossibleDuplicate()) {
-      // do not re-execute the function, another function
-      // targeting the failed buckets will be invoked
-      context.getResultSender().lastResult(new 
CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, false));
-      return;
-    }
-    RegionFunctionContext rfc = (RegionFunctionContext) context;
-    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
-    HDFSForceCompactionArgs args = (HDFSForceCompactionArgs) 
rfc.getArguments();
-    Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); // copying 
avoids race when the function coordinator
-                                                                    // also 
runs the function locally
-    buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
-
-    List<Future<CompactionStatus>> futures =  
pr.forceLocalHDFSCompaction(buckets, args.isMajor(), 0);
-    int waitFor = args.getMaxWaitTime();
-    for (Future<CompactionStatus> future : futures) {
-      long start = System.currentTimeMillis();
-      CompactionStatus status = null;
-      try {
-        // TODO use a CompletionService instead
-        if (!args.isSynchronous() && waitFor <= 0) {
-          break;
-        }
-        status = args.isSynchronous() ? future.get() : future.get(waitFor, 
TimeUnit.MILLISECONDS);
-        buckets.remove(status.getBucketId());
-        if (logger.isDebugEnabled()) {
-          logger.debug("HDFS: ForceCompaction sending result:"+status);
-        }
-        context.getResultSender().sendResult(status);
-        long elapsedTime = System.currentTimeMillis() - start;
-        waitFor -= elapsedTime;
-      } catch (InterruptedException e) {
-        // send a list of failed buckets after waiting for all buckets
-      } catch (ExecutionException e) {
-        // send a list of failed buckets after waiting for all buckets
-      } catch (TimeoutException e) {
-        // do not wait for other buckets to complete
-        break;
-      }
-    }
-    // for asynchronous invocation, the status is true for buckets that we did 
not wait for
-    boolean status = args.isSynchronous() ? false : true;
-    for (Integer bucketId : buckets) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("HDFS: ForceCompaction sending result for 
bucket:"+bucketId);
-      }
-      context.getResultSender().sendResult(new CompactionStatus(bucketId, 
status));
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("HDFS: ForceCompaction sending last result");
-    }
-    context.getResultSender().lastResult(new 
CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, true));
-  }
-
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  @Override
-  public boolean hasResult() {
-    return true;
-  }
-
-  @Override
-  public boolean optimizeForWrite() {
-    // run compaction on primary members
-    return true;
-  }
-
-  @Override
-  public boolean isHA() {
-    // so that we can target re-execution on failed buckets
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
deleted file mode 100644
index ee5e4aa..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.gemstone.gemfire.cache.execute.FunctionException;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
-
-/**
- * 
- */
-public class HDFSForceCompactionResultCollector implements 
LocalResultCollector<Object, List<CompactionStatus>> {
-
-  /** list of received replies*/
-  private List<CompactionStatus> reply = new ArrayList<CompactionStatus>();
-
-  /** semaphore to block the caller of getResult()*/
-  private CountDownLatch waitForResults = new CountDownLatch(1);
-
-  /** boolean to indicate if clearResults() was called to indicate a failure*/
-  private volatile boolean shouldRetry;
-
-  private ReplyProcessor21 processor;
-
-  @Override
-  public List<CompactionStatus> getResult() throws FunctionException {
-    try {
-      waitForResults.await();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      
GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
-      throw new FunctionException(e);
-    }
-    return reply;
-  }
-
-  @Override
-  public List<CompactionStatus> getResult(long timeout, TimeUnit unit)
-      throws FunctionException, InterruptedException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void addResult(DistributedMember memberID,
-      Object resultOfSingleExecution) {
-    if (resultOfSingleExecution instanceof CompactionStatus) {
-      CompactionStatus status = (CompactionStatus) resultOfSingleExecution;
-      if (status.getBucketId() != 
HDFSForceCompactionFunction.BUCKET_ID_FOR_LAST_RESULT) {
-        reply.add(status);
-      }
-    }
-  }
-
-  @Override
-  public void endResults() {
-    waitForResults.countDown();
-  }
-
-  @Override
-  public void clearResults() {
-    this.shouldRetry = true;
-    waitForResults.countDown();
-  }
-
-  /**
-   * @return true if retry should be attempted
-   */
-  public boolean shouldRetry() {
-    return this.shouldRetry || !getFailedBucketIds().isEmpty();
-  }
-
-  private Set<Integer> getFailedBucketIds() {
-    Set<Integer> result = new HashSet<Integer>();
-    for (CompactionStatus status : reply) {
-      if (!status.isStatus()) {
-        result.add(status.getBucketId());
-      }
-    }
-    return result;
-  }
-
-  public Set<Integer> getSuccessfulBucketIds() {
-    Set<Integer> result = new HashSet<Integer>();
-    for (CompactionStatus status : reply) {
-      if (status.isStatus()) {
-        result.add(status.getBucketId());
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void setProcessor(ReplyProcessor21 processor) {
-    this.processor = processor;
-  }
-
-  @Override
-  public ReplyProcessor21 getProcessor() {
-    return this.processor;
-  }
-
-@Override
-public void setException(Throwable exception) {
-       // TODO Auto-generated method stub
-       
-}
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
deleted file mode 100644
index 789fe4d..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import com.gemstone.gemfire.cache.execute.FunctionAdapter;
-import com.gemstone.gemfire.cache.execute.FunctionContext;
-import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.internal.InternalEntity;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-
-/**
- * Function that returns the oldest timestamp among all the major
- * compacted buckets on the members
- *
- */
-@SuppressWarnings("serial")
-public class HDFSLastCompactionTimeFunction extends FunctionAdapter implements 
InternalEntity{
-
-  public static final String ID = "HDFSLastCompactionTimeFunction";
-
-  @Override
-  public void execute(FunctionContext context) {
-    RegionFunctionContext rfc = (RegionFunctionContext) context;
-    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
-    rfc.getResultSender().lastResult(pr.lastLocalMajorHDFSCompaction());
-  }
-
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  @Override
-  public boolean isHA() {
-    return true;
-  }
-
-  @Override
-  public boolean optimizeForWrite() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
deleted file mode 100644
index 6d70dce..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.StatisticsFactory;
-import com.gemstone.gemfire.cache.GemFireCache;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.SystemTimer;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import 
com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-
-import org.apache.logging.log4j.Logger;
-
-/**
- * Cache for hoplog organizers associated with buckets of a region. The 
director creates an
- * instance of organizer on first get request. It does not read HDFS in 
advance. Creation of
- * organizer depends on File system initialization that takes outside this 
class. This class also
- * provides utility methods to monitor usage and manage bucket sets.
- * 
- */
-public class HDFSRegionDirector {
-  /*
-   * Maps each region name to its listener and store objects. This map must be 
populated before file
-   * organizers of a bucket can be created
-   */
-  private final ConcurrentHashMap<String, HdfsRegionManager> regionManagerMap;
-  
-  /**
-   * regions of this Gemfire cache are managed by this director. TODO this
-   * should be final and be provided at the time of creation of this instance 
or
-   * through a cache directory
-   */
-  private GemFireCache cache;
-  
-  // singleton instance
-  private static HDFSRegionDirector instance;
-  
-  final ScheduledExecutorService janitor;
-  private JanitorTask janitorTask;
-  
-  private static final Logger logger = LogService.getLogger();
-  protected final static String logPrefix = "<" + "RegionDirector" + "> ";
-  
-  
-  private HDFSRegionDirector() {
-    regionManagerMap = new ConcurrentHashMap<String, 
HDFSRegionDirector.HdfsRegionManager>();
-    janitor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread thread = new Thread(r, "HDFSRegionJanitor");
-        thread.setDaemon(true);
-        return thread;
-      }
-    });
-    
-    long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
-        HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
-    
-    janitorTask = new JanitorTask();
-    janitor.scheduleWithFixedDelay(janitorTask, interval, interval,
-        TimeUnit.SECONDS);
-  }
-  
-  public synchronized static HDFSRegionDirector getInstance() {
-    if (instance == null) {
-      instance = new HDFSRegionDirector();
-    }
-    return instance;
-  }
-  
-  public HDFSRegionDirector setCache(GemFireCache cache) {
-    this.cache = cache;
-    return this;
-  }
-
-  public GemFireCache getCache() {
-    return this.cache;
-  }
-  /**
-   * Caches listener, store object and list of organizers associated with the 
region associated with
-   * a region. Subsequently, these objects will be used each time an organizer 
is created
-   */
-  public synchronized HdfsRegionManager manageRegion(LocalRegion region, 
String storeName,
-      HoplogListener listener) {
-    
-    HdfsRegionManager manager = regionManagerMap.get(region.getFullPath());
-    if (manager != null) {
-      // this is an attempt to re-register a region. Assuming this was required
-      // to modify listener or hdfs store impl associated with the region. 
Hence
-      // will clear the region first.
-
-      clear(region.getFullPath());
-    }
-    
-    HDFSStoreImpl store = 
HDFSStoreDirector.getInstance().getHDFSStore(storeName);
-    manager = new HdfsRegionManager(region, store, listener, 
getStatsFactory(), this);
-    regionManagerMap.put(region.getFullPath(), manager);
-    
-    if (logger.isDebugEnabled()) {
-      logger.debug("{}Now managing region " + region.getFullPath(), logPrefix);
-    }
-    
-    return manager;
-  }
-  
-  /**
-   * Find the regions that are part of a particular HDFS store.
-   */
-  public Collection<String> getRegionsInStore(HDFSStore store) {
-    TreeSet<String> regions = new TreeSet<String>();
-    for(Map.Entry<String, HdfsRegionManager> entry : 
regionManagerMap.entrySet()) {
-      if(entry.getValue().getStore().equals(store)) {
-        regions.add(entry.getKey());
-      }
-    }
-    return regions;
-  }
-  
-  public int getBucketCount(String regionPath) {
-    HdfsRegionManager manager = regionManagerMap.get(regionPath);
-    if (manager == null) {
-      throw new IllegalStateException("Region not initialized");
-    }
-
-    return manager.bucketOrganizerMap.size();
-  }
-  
-  public void closeWritersForRegion(String regionPath, int 
minSizeForFileRollover) throws IOException {
-    regionManagerMap.get(regionPath).closeWriters(minSizeForFileRollover);
-  }
-  /**
-   * removes and closes all {@link HoplogOrganizer} of this region. This call 
is expected with
-   * a PR disowns a region.
-   */
-  public synchronized void clear(String regionPath) {
-    HdfsRegionManager manager = regionManagerMap.remove(regionPath);
-    if (manager != null) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Closing hoplog region manager for " + regionPath, 
logPrefix);
-      }
-      manager.close();
-    }
-  }
-
-  /**
-   * Closes all region managers, organizers and hoplogs. This method should be
-   * called before closing the cache to gracefully release all resources
-   */
-  public static synchronized void reset() {
-    if (instance == null) {
-      // nothing to reset
-      return;
-    }
-    
-    instance.janitor.shutdownNow();
-    
-    for (String region : instance.regionManagerMap.keySet()) {
-      instance.clear(region);
-    }
-    instance.cache = null;
-    instance = null;
-  }
-  
-  /**
-   * Terminates current janitor task and schedules a new. The rate of the new
-   * task is based on the value of system property at that time
-   */
-  public static synchronized void resetJanitor() {
-    instance.janitorTask.terminate();
-    instance.janitorTask = instance.new JanitorTask();
-    long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
-        HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
-    instance.janitor.scheduleWithFixedDelay(instance.janitorTask, 0, interval,
-        TimeUnit.SECONDS);
-  }
-  
-  /**
-   * @param regionPath name of region for which stats object is desired
-   * @return {@link SortedOplogStatistics} instance associated with hdfs region
-   *         name. Null if region is not managed by director
-   */
-  public synchronized SortedOplogStatistics getHdfsRegionStats(String 
regionPath) {
-    HdfsRegionManager manager = regionManagerMap.get(regionPath);
-    return manager == null ? null : manager.getHdfsStats();
-  }
-  
-  private StatisticsFactory getStatsFactory() {
-    return cache.getDistributedSystem();
-  }
-
-  /**
-   * A helper class to manage region and its organizers
-   */
-  public static class HdfsRegionManager {
-    // name and store configuration of the region whose buckets are managed by 
this director.
-    private LocalRegion region;
-    private HDFSStoreImpl store;
-    private HoplogListener listener;
-    private volatile boolean closed = false;
-    private final int FILE_ROLLOVER_TASK_INTERVAL = Integer.parseInt
-        
(System.getProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS",
 "60"));
-    
-    private SystemTimer hoplogCloseTimer = null;
-    
-    // instance of hdfs statistics object for this hdfs based region. This
-    // object will collect usage and performance related statistics.
-    private final SortedOplogStatistics hdfsStats;
-
-    /*
-     * An instance of organizer is created for each bucket of regionName 
region residing on this
-     * node. This member maps bucket id with its corresponding organizer 
instance. A lock is used to
-     * manage concurrent writes to the map.
-     */
-    private ConcurrentMap<Integer, HoplogOrganizer> bucketOrganizerMap;
-    
-    private HDFSRegionDirector hdfsRegionDirector;
-
-    /**
-     * @param listener
-     *          listener of change events like file creation and deletion
-     * @param hdfsRegionDirector 
-     */
-    HdfsRegionManager(LocalRegion region, HDFSStoreImpl store,
-        HoplogListener listener, StatisticsFactory statsFactory, 
HDFSRegionDirector hdfsRegionDirector) {
-      bucketOrganizerMap = new ConcurrentHashMap<Integer, HoplogOrganizer>();
-      this.region = region;
-      this.listener = listener;
-      this.store = store;
-      this.hdfsStats = new SortedOplogStatistics(statsFactory, 
"HDFSRegionStatistics", region.getFullPath());
-      this.hdfsRegionDirector = hdfsRegionDirector;
-    }
-
-    public void closeWriters(int minSizeForFileRollover) throws IOException {
-      final long startTime = System.currentTimeMillis();
-      long elapsedTime = 0;
-        
-      Collection<HoplogOrganizer> organizers = bucketOrganizerMap.values();
-      
-      for (HoplogOrganizer organizer : organizers) {
-      
-        try {
-          this.getRegion().checkReadiness();
-        } catch (Exception e) {
-          break;
-        }
-        
-        ((HDFSUnsortedHoplogOrganizer)organizer).synchronizedCloseWriter(true, 
0, 
-            minSizeForFileRollover);
-      }
-      
-    }
-
-    public synchronized <T extends PersistedEventImpl> HoplogOrganizer<T> 
create(int bucketId) throws IOException {
-      assert !bucketOrganizerMap.containsKey(bucketId);
-
-      HoplogOrganizer<?> organizer = region.getHDFSWriteOnly() 
-          ? new HDFSUnsortedHoplogOrganizer(this, bucketId) 
-          : new HdfsSortedOplogOrganizer(this, bucketId);
-
-      bucketOrganizerMap.put(bucketId, organizer);
-      // initialize a timer that periodically closes the hoplog writer if the 
-      // time for rollover has passed. It also has the responsibility to fix 
the files.  
-      if (this.region.getHDFSWriteOnly() && 
-          hoplogCloseTimer == null) {
-        hoplogCloseTimer = new SystemTimer(hdfsRegionDirector.
-            getCache().getDistributedSystem(), true);
-        
-        // schedule the task to fix the files that were not closed properly 
-        // last time. 
-        hoplogCloseTimer.scheduleAtFixedRate(new 
CloseTmpHoplogsTimerTask(this), 
-            1000, FILE_ROLLOVER_TASK_INTERVAL * 1000);
-        
-        if (logger.isDebugEnabled()) {
-          logger.debug("{}Schedulng hoplog rollover timer with interval "+ 
FILE_ROLLOVER_TASK_INTERVAL + 
-              " for hoplog organizer for " + region.getFullPath()
-              + ":" + bucketId + " " + organizer, logPrefix);
-        }
-      }
-      
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}Constructed hoplog organizer for " + 
region.getFullPath()
-            + ":" + bucketId + " " + organizer, logPrefix);
-      }
-      return (HoplogOrganizer<T>) organizer;
-    }
-    
-    public synchronized <T extends PersistedEventImpl> void addOrganizer(
-        int bucketId, HoplogOrganizer<T> organizer) {
-      if (bucketOrganizerMap.containsKey(bucketId)) {
-        throw new IllegalArgumentException();
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug("{}added pre constructed organizer " + 
region.getFullPath()
-            + ":" + bucketId + " " + organizer, logPrefix);
-      }
-      bucketOrganizerMap.put(bucketId, organizer);
-    }
-
-    public void close() {
-      closed = true;
-      
-      if (this.region.getHDFSWriteOnly() && 
-          hoplogCloseTimer != null) {
-        hoplogCloseTimer.cancel();
-        hoplogCloseTimer = null;
-      }
-      for (int bucket : bucketOrganizerMap.keySet()) {
-        close(bucket);
-      }
-    }
-    
-    public boolean isClosed() {
-      return closed;
-    }
-
-    public synchronized void close(int bucketId) {
-      try {
-        HoplogOrganizer organizer = bucketOrganizerMap.remove(bucketId);
-        if (organizer != null) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("{}Closing hoplog organizer for " + 
region.getFullPath() + ":" + 
-                bucketId + " " + organizer, logPrefix);
-          }
-          organizer.close();
-        }
-      } catch (IOException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(logPrefix + "Error closing hoplog organizer for " + 
region.getFullPath() + ":" + bucketId, e);
-        }
-      }
-      //TODO abort compaction and flush requests for this region
-    }
-    
-    public static String getRegionFolder(String regionPath) {
-      String folder = regionPath;
-      //Change any underscore into a double underscore
-      folder = folder.replace("_", "__");
-      //get rid of the leading slash
-      folder = folder.replaceFirst("^/", "");
-      //replace slashes with underscores
-      folder = folder.replace('/', '_');
-      return folder;
-    }
-
-    public String getRegionFolder() {
-      return getRegionFolder(region.getFullPath());
-    }
-
-    public HoplogListener getListener() {
-      return listener;
-    }
-
-    public HDFSStoreImpl getStore() {
-      return store;
-    }
-
-    public LocalRegion getRegion() {
-      return region;
-    }
-    
-    public SortedOplogStatistics getHdfsStats() {
-      return hdfsStats;
-    }
-    
-    public Collection<HoplogOrganizer> getBucketOrganizers(){
-      return this.bucketOrganizerMap.values();
-    }
-
-    /**
-     * get the HoplogOrganizers only for the given set of buckets
-     */
-    public Collection<HoplogOrganizer> getBucketOrganizers(Set<Integer> 
buckets){
-      Set<HoplogOrganizer> result = new HashSet<HoplogOrganizer>();
-      for (Integer bucketId : buckets) {
-        result.add(this.bucketOrganizerMap.get(bucketId));
-      }
-      return result;
-    }
-
-    /**
-     * Delete all files from HDFS for this region. This method
-     * should be called after all members have destroyed their
-     * region in gemfire, so there should be no threads accessing
-     * these files.
-     * @throws IOException 
-     */
-    public void destroyData() throws IOException {
-      //Make sure everything is shut down and closed.
-      close();
-      if (store == null) {
-        return;
-      }
-      Path regionPath = new Path(store.getHomeDir(), getRegionFolder());
-      
-      //Delete all files in HDFS.
-      FileSystem fs = getStore().getFileSystem();
-      if(!fs.delete(regionPath, true)) {
-        if(fs.exists(regionPath)) {
-          throw new IOException("Unable to delete " + regionPath);
-        }
-      }
-    }
-
-    public void performMaintenance() throws IOException {
-      Collection<HoplogOrganizer> buckets = getBucketOrganizers();
-      for (HoplogOrganizer bucket : buckets) {
-        bucket.performMaintenance();
-      }
-    }
-  }
-  
-  private class JanitorTask implements Runnable {
-    boolean terminated = false;
-    @Override
-    public void run() {
-      if (terminated) {
-        return;
-      }
-      fineLog("Executing HDFS Region janitor task", null);
-      
-      Collection<HdfsRegionManager> regions = regionManagerMap.values();
-      for (HdfsRegionManager region : regions) {
-        fineLog("Maintaining region:" + region.getRegionFolder(), null);
-        try {
-          region.performMaintenance();
-        } catch (Throwable e) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR 
, region.getRegionFolder()));
-          logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, 
e.getMessage()));
-          fineLog(null, e);
-        }
-      }
-    }
-
-    public void terminate() {
-      terminated = true;
-    }
-  }
-  
-  protected static void fineLog(String message, Throwable e) {
-    if(logger.isDebugEnabled()) {
-      logger.debug(message, e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
deleted file mode 100644
index 880ef3e..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-
-/**
- * HDFSStoreDirector is created for managing all instances of HDFSStoreImpl.   
 
- *
- */
-public final class HDFSStoreDirector {
-  private final ConcurrentHashMap<String, HDFSStoreImpl> storeMap = new 
ConcurrentHashMap<String, HDFSStoreImpl>();
-
-  // singleton instance
-  private static volatile HDFSStoreDirector instance;
-  
-  private HDFSStoreDirector() {
-
-  }
-  
-  public static final HDFSStoreDirector getInstance() {
-    if (instance == null) {
-      synchronized (HDFSStoreDirector.class)  {
-        if (instance == null)
-          instance = new HDFSStoreDirector();
-      }
-    }
-    return instance;
-  }
-
-  // Called when the region is created.
-  public final void addHDFSStore(HDFSStoreImpl hdfsStore){
-    this.storeMap.put(hdfsStore.getName(), hdfsStore); 
-  }
-  
-  public final HDFSStoreImpl getHDFSStore(String hdfsStoreName) {
-    return this.storeMap.get(hdfsStoreName);
-  }
-  
-  public final void removeHDFSStore(String hdfsStoreName) {
-    this.storeMap.remove(hdfsStoreName);
-  } 
-  
-  public void closeHDFSStores() {
-    Iterator<HDFSStoreImpl> it = this.storeMap.values().iterator();
-    while (it.hasNext()) {
-      HDFSStoreImpl hsi = it.next();
-      hsi.close();
-    }
-    this.storeMap.clear();
-  }
-
-   public ArrayList<HDFSStoreImpl> getAllHDFSStores() {
-    ArrayList<HDFSStoreImpl> hdfsStores = new ArrayList<HDFSStoreImpl>();
-    hdfsStores.addAll(this.storeMap.values());
-    return hdfsStores;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
deleted file mode 100644
index cbb35cb..0000000
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import 
com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-/**
- * Manages unsorted Hoplog files for a bucket (Streaming Ingest option). An 
instance per bucket 
- * will exist in each PR
- * 
- *
- */
-public class HDFSUnsortedHoplogOrganizer extends 
AbstractHoplogOrganizer<UnsortedHoplogPersistedEvent> {
-  public static final String HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
-      + SEQ_HOPLOG_EXTENSION + "|" + TEMP_HOPLOG_EXTENSION + ")";
-  public static final Pattern HOPLOG_PATTERN = Pattern.compile(HOPLOG_REGEX);
-  protected static String TMP_FILE_NAME_REGEX = HOPLOG_NAME_REGEX + 
SEQ_HOPLOG_EXTENSION + TEMP_HOPLOG_EXTENSION + "$";
-  protected static final Pattern patternForTmpHoplog = 
Pattern.compile(TMP_FILE_NAME_REGEX);
-  
-   volatile private HoplogWriter writer;
-   volatile private Hoplog currentHoplog;
-   
-   volatile private long lastFlushTime = System.currentTimeMillis();
-   
-   volatile private boolean abortFlush = false;
-   private FileSystem fileSystem;
-   
-   public HDFSUnsortedHoplogOrganizer(HdfsRegionManager region, int bucketId) 
throws IOException{
-    super(region, bucketId);
-    writer = null;
-    sequence = new AtomicInteger(0);
-
-    fileSystem = store.getFileSystem();
-    if (! fileSystem.exists(bucketPath)) {
-      return;
-    }
-    
-    FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new 
PathFilter() {
-      @Override
-      public boolean accept(Path file) {
-        // All valid hoplog files must match the regex
-        Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
-        return matcher.matches();
-      }
-    });
-
-    if (validHoplogs != null && validHoplogs.length > 0) {
-      for (FileStatus file : validHoplogs) {
-        // account for the disk used by this file
-        incrementDiskUsage(file.getLen());
-      }
-    }
-
-  }
-  
-    @Override
-    public void close() throws IOException {
-      super.close();
-      if (logger.isDebugEnabled())
-        logger.debug("{}Closing the hoplog organizer and the open files", 
logPrefix);
-      // abort the flush so that we can immediately call the close current 
writer. 
-      abortFlush = true;
-      synchronizedCloseWriter(true, 0, 0);
-    }
-    
-    
-    /**
-     * Flushes the data to HDFS. 
-     * Synchronization ensures that the writer is not closed when flush is 
happening.
-     * To abort the flush, abortFlush needs to be set.  
-     * @throws ForceReattemptException 
-     */
-     @Override
-    public synchronized void flush(Iterator<? extends QueuedPersistentEvent> 
bufferIter, final int count)
-        throws IOException, ForceReattemptException {
-      assert bufferIter != null;
-      
-      if (abortFlush)
-        throw new CacheClosedException("Either the region has been cleared " +
-            "or closed. Aborting the ongoing flush operation.");
-      if (logger.isDebugEnabled())
-        logger.debug("{}Initializing flush operation", logPrefix);
-      
-      // variables for updating stats
-      long start = stats.getFlush().begin();
-      int byteCount = 0;
-      if (writer == null) {
-        // Hoplogs of sequence files are always created with a 0 sequence 
number
-        currentHoplog = getTmpSortedOplog(0, SEQ_HOPLOG_EXTENSION);
-        try {
-          writer = this.store.getSingletonWriter().runSerially(new 
Callable<Hoplog.HoplogWriter>() {
-            @Override
-            public HoplogWriter call() throws Exception {
-              return currentHoplog.createWriter(count);
-            }
-          });
-        } catch (Exception e) {
-          if (e instanceof IOException) {
-            throw (IOException)e;
-          }
-          throw new IOException(e);
-        }
-      }
-      long timeSinceLastFlush = (System.currentTimeMillis() - 
lastFlushTime)/1000 ;
-      
-      try {
-        /**MergeGemXDHDFSToGFE changed the following statement as the code of 
HeapDataOutputStream is not merged */
-        //HeapDataOutputStream out = new HeapDataOutputStream();
-        while (bufferIter.hasNext()) {
-          HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
-          if (abortFlush) {
-            stats.getFlush().end(byteCount, start);
-            throw new CacheClosedException("Either the region has been cleared 
" +
-                       "or closed. Aborting the ongoing flush operation.");
-          }
-          QueuedPersistentEvent item = bufferIter.next();
-          item.toHoplogEventBytes(out);
-          byte[] valueBytes = out.toByteArray();
-          writer.append(item.getRawKey(), valueBytes);
-          // add key length and value length to stats byte counter
-          byteCount += (item.getRawKey().length + valueBytes.length);
-          /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel 
to merge this change*/
-          //out.clearForReuse();
-        }
-        // ping secondaries before making the file a legitimate file to ensure 
-        // that in case of split brain, no other vm has taken up as primary. 
#50110. 
-        if (!abortFlush)
-          pingSecondaries();
-        // append completed. If the file is to be rolled over, 
-        // close writer and rename the file to a legitimate name.
-        // Else, sync the already written data with HDFS nodes. 
-        int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 
1024;  
-        int fileRolloverInterval = 
this.store.getWriteOnlyFileRolloverInterval(); 
-        if (writer.getCurrentSize() >= maxFileSize || 
-            timeSinceLastFlush >= fileRolloverInterval) {
-          closeCurrentWriter();
-        }
-        else {
-          // if flush is not aborted, hsync the batch. It ensures that 
-          // the batch has reached HDFS and we can discard it. 
-          if (!abortFlush)
-            writer.hsync();
-        }
-      } catch (IOException e) {
-        stats.getFlush().error(start);
-        // as there is an exception, it can be probably be a file specific 
problem.
-        // close the current file to avoid any file specific issues next time  
-        closeCurrentWriter();
-        // throw the exception so that async queue will dispatch the same 
batch again 
-        throw e;
-      } 
-      
-      stats.getFlush().end(byteCount, start);
-    }
-    
-    /**
-     * Synchronization ensures that the writer is not closed when flush is 
happening. 
-     */
-    synchronized void synchronizedCloseWriter(boolean forceClose, 
-        long timeSinceLastFlush, int minsizeforrollover) throws IOException { 
-      long writerSize = 0;
-      if (writer != null){
-        writerSize = writer.getCurrentSize();
-      }
-      
-      if (writerSize < (minsizeforrollover * 1024L))
-        return;
-      
-      int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 
1024;  
-      int fileRolloverInterval = 
this.store.getWriteOnlyFileRolloverInterval(); 
-      if (writerSize >= maxFileSize || 
-          timeSinceLastFlush >= fileRolloverInterval || forceClose) {
-        closeCurrentWriter();
-      }
-      }
-        
-    
-    /**
-     * Closes the current writer so that next time a new hoplog can 
-     * be created. Also, fixes any tmp hoplogs. 
-     * 
-     * @throws IOException
-     */
-    void closeCurrentWriter() throws IOException {
-      
-      if (writer != null) {
-        // If this organizer is closing, it is ok to ignore exceptions here
-        // because CloseTmpHoplogsTimerTask
-        // on another member may have already renamed the hoplog
-        // fixes bug 49141
-        boolean isClosing = abortFlush;
-        try {
-          incrementDiskUsage(writer.getCurrentSize());
-        } catch (IOException e) {
-          if (!isClosing) {
-            throw e;
-          }
-        }
-        if (logger.isDebugEnabled())
-          logger.debug("{}Closing hoplog " + currentHoplog.getFileName(), 
logPrefix);
-        try{
-          writer.close();
-          makeLegitimate(currentHoplog);
-        } catch (IOException e) {
-          if (!isClosing) {
-            logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
-            throw e;
-          }
-        } finally {
-          writer = null;
-          lastFlushTime = System.currentTimeMillis();
-        }
-      }
-      else
-        lastFlushTime = System.currentTimeMillis();
-    }
-
-    @Override
-    public void clear() throws IOException {
-      boolean prevAbortFlushFlag = abortFlush;
-      // abort the flush so that we can immediately call the close current 
writer. 
-      abortFlush = true;
-      
-      // Close if there is any existing writer. 
-      try {
-        synchronizedCloseWriter(true, 0, 0);
-      } catch (IOException e) {
-        logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
-      }
-      
-      // reenable the aborted flush
-      abortFlush = prevAbortFlushFlag;
-      
-      // Mark the hoplogs for deletion
-      markHoplogsForDeletion();
-      
-    }
-  
-    @Override
-    public void performMaintenance() {
-      // TODO remove the timer for tmp file conversion. Use this instead
-    }
-
-    @Override
-    public Future<CompactionStatus> forceCompaction(boolean isMajor) {
-      return null;
-    }
-
-    @Override
-    protected Hoplog getHoplog(Path hoplogPath) throws IOException {
-      Hoplog so = new SequenceFileHoplog(fileSystem, hoplogPath, stats);
-      return so;
-    }
-  
-  /**
-   * Fixes the size of hoplogs that were not closed properly last time. 
-   * Such hoplogs are *.tmphop files. Identify them and open them and close 
-   * them, this fixes the size. After doing this rename them to *.hop. 
-   * 
-   * @throws IOException
-   * @throws ForceReattemptException 
-   */
-  void identifyAndFixTmpHoplogs(FileSystem fs) throws IOException, 
ForceReattemptException {
-    if (logger.isDebugEnabled())
-      logger.debug("{}Fixing temporary hoplogs", logPrefix);
-    
-    // A different filesystem is passed to this function for the following 
reason: 
-    // For HDFS, if a file wasn't closed properly last time, 
-    // while calling FileSystem.append for this file, 
FSNamesystem.startFileInternal->
-    // FSNamesystem.recoverLeaseInternal function gets called. 
-    // This function throws AlreadyBeingCreatedException if there is an open 
handle, to any other file, 
-    // created using the same FileSystem object. This is a bug and is being 
tracked at: 
-    // 
https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
-    // 
-    // The fix for this bug is not yet part of Pivotal HD. So to overcome the 
bug, 
-    // we create a new file system for the timer task so that it does not 
encounter the bug. 
-    
-    FileStatus tmpHoplogs[] = FSUtils.listStatus(fs, 
fs.makeQualified(bucketPath), new PathFilter() {
-      @Override
-      public boolean accept(Path file) {
-        // All valid hoplog files must match the regex
-        Matcher matcher = patternForTmpHoplog.matcher(file.getName());
-        return matcher.matches();
-      }
-    });
-    
-    if (tmpHoplogs == null || tmpHoplogs.length == 0) {
-      if (logger.isDebugEnabled())
-        logger.debug("{}No files to fix", logPrefix);
-      return;
-    }
-    // ping secondaries so that in case of split brain, no other vm has taken 
up 
-    // as primary. #50110. 
-    pingSecondaries();
-    if (logger.isDebugEnabled())
-      logger.debug("{}Files to fix " + tmpHoplogs.length, logPrefix);
-
-    String currentHoplogName = null;
-    // get the current hoplog name. We need to ignore current hoplog while 
fixing. 
-    if (currentHoplog != null) {
-      currentHoplogName = currentHoplog.getFileName();
-    }
-    
-    for (int i = 0; i < tmpHoplogs.length; i++) {
-      // Skip directories
-      if (tmpHoplogs[i].isDirectory()) {
-        continue;
-      }
-
-      final Path p = tmpHoplogs[i].getPath();
-      
-      if (tmpHoplogs[i].getPath().getName().equals(currentHoplogName)){
-        if (logger.isDebugEnabled())
-          logger.debug("Skipping current file: " + 
tmpHoplogs[i].getPath().getName(), logPrefix);
-        continue;
-      } 
-      
-      SequenceFileHoplog hoplog = new SequenceFileHoplog(fs, p, stats);
-      try {
-        makeLegitimate(hoplog);
-        logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " 
+ p + " was a temporary " +
-            "hoplog because the node managing it wasn't shutdown properly last 
time. Fixed the hoplog name."));
-      } catch (IOException e) {
-        logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " 
+ p + " is still a temporary " +
-            "hoplog because the node managing it wasn't shutdown properly last 
time. Failed to " +
-            "change the hoplog name because an exception was thrown while 
fixing it. " + e));
-      }
-    }
-  }
-  
-  private FileStatus[] getExpiredHoplogs() throws IOException {
-    FileStatus files[] = FSUtils.listStatus(fileSystem, bucketPath, new 
PathFilter() {
-      @Override
-      public boolean accept(Path file) {
-        // All expired hoplog end with expire extension and must match the 
valid file regex
-        String fileName = file.getName();
-        if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
-          return false;
-        }
-        return true;
-      }
-    });
-    return files;
-  }
-  /**
-   * locks sorted oplogs collection, removes oplog and renames for deletion 
later
-   * @throws IOException 
-   */
-  private void markHoplogsForDeletion() throws IOException {
-    
-    ArrayList<IOException> errors = new ArrayList<IOException>();
-    FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new 
PathFilter() {
-      @Override
-      public boolean accept(Path file) {
-        // All valid hoplog files must match the regex
-        Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
-        return matcher.matches();
-      }
-    });
-    
-    FileStatus[] expired = getExpiredHoplogs();
-    validHoplogs = filterValidHoplogs(validHoplogs, expired);
-
-    if (validHoplogs == null || validHoplogs.length == 0) {
-      return;
-    }
-    for (FileStatus fileStatus : validHoplogs) {
-      try {
-        addExpiryMarkerForAFile(getHoplog(fileStatus.getPath()));
-      } catch (IOException e) {
-        // even if there is an IO error continue removing other hoplogs and
-        // notify at the end
-        errors.add(e);
-      }
-    }
-    
-    if (!errors.isEmpty()) {
-      for (IOException e : errors) {
-        logger.warn(LocalizedStrings.HOPLOG_HOPLOG_REMOVE_FAILED, e);
-      }
-    }
-  }
-  
-  @Override
-  public Compactor getCompactor() {
-    throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
-  }
-  
-    @Override
-  public HoplogIterator<byte[], UnsortedHoplogPersistedEvent> scan(
-      long startOffset, long length) throws IOException {
-    throw new UnsupportedOperationException("Not supported for " + 
this.getClass().getSimpleName());
-    }
-
-  public long getLastFlushTime() {
-    return this.lastFlushTime;
-      }
-  
-  public long getfileRolloverInterval(){
-    int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
-    return fileRolloverInterval;
-    }
-
-  @Override
-  public long getLastMajorCompactionTimestamp() {
-    throw new UnsupportedOperationException();
-  }
-
-}

Reply via email to