http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java deleted file mode 100644 index cdf7452..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; -import com.gemstone.gemfire.cache.execute.Function; -import com.gemstone.gemfire.cache.execute.FunctionContext; -import com.gemstone.gemfire.cache.execute.FunctionException; -import com.gemstone.gemfire.cache.execute.FunctionService; -import com.gemstone.gemfire.cache.execute.RegionFunctionContext; -import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; -import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.InternalEntity; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.execute.AbstractExecution; -import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -public class HDFSFlushQueueFunction implements Function, InternalEntity{ - private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3); - private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE"); - private static final Logger logger = LogService.getLogger(); - private static final String ID = HDFSFlushQueueFunction.class.getName(); - - public static void flushQueue(PartitionedRegion pr, int maxWaitTime) { - - Set<Integer> buckets = new HashSet<Integer>(pr.getRegionAdvisor().getBucketSet()); - - maxWaitTime *= 1000; - long start = System.currentTimeMillis(); - - int retries = 0; - long remaining = 0; - while (retries++ < MAX_RETRIES && (remaining = waitTime(start, maxWaitTime)) > 0) { - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + buckets - + ", attempt = " + retries - + ", remaining = " + remaining)); - } - - HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining); - - HDFSFlushQueueResultCollector rc = new HDFSFlushQueueResultCollector(buckets); - AbstractExecution exec = (AbstractExecution) FunctionService - .onRegion(pr) - .withArgs(args) - .withCollector(rc); - exec.setWaitOnExceptionFlag(true); - - try { - exec.execute(ID); - if (rc.getResult()) { - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully")); - } - return; - } - } catch (FunctionException e) { - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e); - } - } - - buckets.removeAll(rc.getSuccessfulBuckets()); - for (int bucketId : buckets) { - remaining = waitTime(start, maxWaitTime); - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + bucketId)); - } - pr.getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper((int) remaining)); - } - } - - pr.checkReadiness(); - throw new FunctionException("Unable to flush the following buckets: " + buckets); - } - - private static long waitTime(long start, long max) { - if (max == 0) { - return Integer.MAX_VALUE; - } - return start + max - System.currentTimeMillis(); - } - - @Override - public void execute(FunctionContext context) { - RegionFunctionContext rfc = (RegionFunctionContext) context; - PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet(); - - HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments(); - Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); - buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds()); - - Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, AsyncFlushResult>(); - for (int bucketId : buckets) { - try { - HDFSBucketRegionQueue brq = getQueue(pr, bucketId); - if (brq != null) { - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + bucketId)); - } - flushes.put(bucketId, brq.flush()); - } - } catch (ForceReattemptException e) { - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + bucketId), e); - } - } - } - - try { - long start = System.currentTimeMillis(); - for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) { - long remaining = waitTime(start, args.getMaxWaitTime()); - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + flush.getKey() - + " to complete flushing, remaining = " + remaining)); - } - - if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) { - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + flush.getKey() + " flushed successfully")); - } - rfc.getResultSender().sendResult(new FlushStatus(flush.getKey())); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - if (logger.isDebugEnabled() || VERBOSE) { - logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result")); - } - rfc.getResultSender().lastResult(FlushStatus.last()); - } - - private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId) - throws ForceReattemptException { - AsyncEventQueueImpl aeq = pr.getHDFSEventQueue(); - AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender(); - AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor(); - if (ep == null) { - return null; - } - - ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue) ep.getQueue(); - return queue.getBucketRegionQueue(pr, bucketId); - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public boolean optimizeForWrite() { - return true; - } - - @Override - public boolean isHA() { - return false; - } - - public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> { - private final CountDownLatch complete; - private final Set<Integer> expectedBuckets; - private final Set<Integer> successfulBuckets; - - private volatile ReplyProcessor21 processor; - - public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) { - this.expectedBuckets = expectedBuckets; - - complete = new CountDownLatch(1); - successfulBuckets = new HashSet<Integer>(); - } - - public Set<Integer> getSuccessfulBuckets() { - synchronized (successfulBuckets) { - return new HashSet<Integer>(successfulBuckets); - } - } - - @Override - public Boolean getResult() throws FunctionException { - try { - complete.await(); - synchronized (successfulBuckets) { - LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n(); - if (logger.fineEnabled() || VERBOSE) { - logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + expectedBuckets); - logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + successfulBuckets); - } - return expectedBuckets.equals(successfulBuckets); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e); - throw new FunctionException(e); - } - } - - @Override - public Boolean getResult(long timeout, TimeUnit unit) - throws FunctionException, InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public synchronized void addResult(DistributedMember memberID, Object result) { - if (result instanceof FlushStatus) { - FlushStatus status = (FlushStatus) result; - if (!status.isLast()) { - synchronized (successfulBuckets) { - successfulBuckets.add(status.getBucketId()); - } - } - } - } - - @Override - public void endResults() { - complete.countDown(); - } - - @Override - public void clearResults() { - } - - @Override - public void setProcessor(ReplyProcessor21 processor) { - this.processor = processor; - } - - @Override - public ReplyProcessor21 getProcessor() { - return processor; - } - - @Override - public void setException(Throwable exception) { - // TODO Auto-generated method stub - - } - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java deleted file mode 100644 index ec0f9ff..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.internal.VersionedDataSerializable; -import com.gemstone.gemfire.internal.Version; - -/** - * Arguments passed to the HDFSForceCompactionFunction - * - */ -@SuppressWarnings("serial") -public class HDFSForceCompactionArgs implements VersionedDataSerializable { - - private static Version[] serializationVersions = new Version[]{ Version.GFE_81 }; - - private HashSet<Integer> buckets; - - private boolean isMajor; - - private int maxWaitTime; - - public HDFSForceCompactionArgs() { - } - - public HDFSForceCompactionArgs(Set<Integer> buckets, boolean isMajor, Integer maxWaitTime) { - this.buckets = new HashSet<Integer>(buckets); - this.isMajor = isMajor; - this.maxWaitTime = maxWaitTime; - } - - @Override - public void toData(DataOutput out) throws IOException { - DataSerializer.writeHashSet(buckets, out); - out.writeBoolean(isMajor); - out.writeInt(maxWaitTime); - } - - @Override - public void fromData(DataInput in) throws IOException, - ClassNotFoundException { - this.buckets = DataSerializer.readHashSet(in); - this.isMajor = in.readBoolean(); - this.maxWaitTime = in.readInt(); - } - - @Override - public Version[] getSerializationVersions() { - return serializationVersions; - } - - public Set<Integer> getBuckets() { - return (Set<Integer>) buckets; - } - - public void setBuckets(Set<Integer> buckets) { - this.buckets = new HashSet<Integer>(buckets); - } - - public boolean isMajor() { - return isMajor; - } - - public void setMajor(boolean isMajor) { - this.isMajor = isMajor; - } - - public boolean isSynchronous() { - return maxWaitTime == 0; - } - - public int getMaxWaitTime() { - return this.maxWaitTime; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(getClass().getCanonicalName()).append("@") - .append(System.identityHashCode(this)) - .append(" buckets:").append(buckets) - .append(" isMajor:").append(isMajor) - .append(" maxWaitTime:").append(maxWaitTime); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java deleted file mode 100644 index d26ac1b..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.cache.execute.Function; -import com.gemstone.gemfire.cache.execute.FunctionContext; -import com.gemstone.gemfire.cache.execute.RegionFunctionContext; -import com.gemstone.gemfire.internal.InternalEntity; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.logging.LogService; - -/** - * Function responsible for forcing a compaction on all members - * of the system - * - */ -@SuppressWarnings("serial") -public class HDFSForceCompactionFunction implements Function, InternalEntity { - - public static final int FORCE_COMPACTION_MAX_RETRIES = Integer.getInteger("gemfireXD.maxCompactionRetries", 3); - - public static final int BUCKET_ID_FOR_LAST_RESULT = -1; - - public static final String ID = "HDFSForceCompactionFunction"; - - private static final Logger logger = LogService.getLogger(); - - @Override - public void execute(FunctionContext context) { - if (context.isPossibleDuplicate()) { - // do not re-execute the function, another function - // targeting the failed buckets will be invoked - context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, false)); - return; - } - RegionFunctionContext rfc = (RegionFunctionContext) context; - PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet(); - HDFSForceCompactionArgs args = (HDFSForceCompactionArgs) rfc.getArguments(); - Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); // copying avoids race when the function coordinator - // also runs the function locally - buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds()); - - List<Future<CompactionStatus>> futures = pr.forceLocalHDFSCompaction(buckets, args.isMajor(), 0); - int waitFor = args.getMaxWaitTime(); - for (Future<CompactionStatus> future : futures) { - long start = System.currentTimeMillis(); - CompactionStatus status = null; - try { - // TODO use a CompletionService instead - if (!args.isSynchronous() && waitFor <= 0) { - break; - } - status = args.isSynchronous() ? future.get() : future.get(waitFor, TimeUnit.MILLISECONDS); - buckets.remove(status.getBucketId()); - if (logger.isDebugEnabled()) { - logger.debug("HDFS: ForceCompaction sending result:"+status); - } - context.getResultSender().sendResult(status); - long elapsedTime = System.currentTimeMillis() - start; - waitFor -= elapsedTime; - } catch (InterruptedException e) { - // send a list of failed buckets after waiting for all buckets - } catch (ExecutionException e) { - // send a list of failed buckets after waiting for all buckets - } catch (TimeoutException e) { - // do not wait for other buckets to complete - break; - } - } - // for asynchronous invocation, the status is true for buckets that we did not wait for - boolean status = args.isSynchronous() ? false : true; - for (Integer bucketId : buckets) { - if (logger.isDebugEnabled()) { - logger.debug("HDFS: ForceCompaction sending result for bucket:"+bucketId); - } - context.getResultSender().sendResult(new CompactionStatus(bucketId, status)); - } - if (logger.isDebugEnabled()) { - logger.debug("HDFS: ForceCompaction sending last result"); - } - context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, true)); - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public boolean optimizeForWrite() { - // run compaction on primary members - return true; - } - - @Override - public boolean isHA() { - // so that we can target re-execution on failed buckets - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java deleted file mode 100644 index ee5e4aa..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import com.gemstone.gemfire.cache.execute.FunctionException; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector; - -/** - * - */ -public class HDFSForceCompactionResultCollector implements LocalResultCollector<Object, List<CompactionStatus>> { - - /** list of received replies*/ - private List<CompactionStatus> reply = new ArrayList<CompactionStatus>(); - - /** semaphore to block the caller of getResult()*/ - private CountDownLatch waitForResults = new CountDownLatch(1); - - /** boolean to indicate if clearResults() was called to indicate a failure*/ - private volatile boolean shouldRetry; - - private ReplyProcessor21 processor; - - @Override - public List<CompactionStatus> getResult() throws FunctionException { - try { - waitForResults.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e); - throw new FunctionException(e); - } - return reply; - } - - @Override - public List<CompactionStatus> getResult(long timeout, TimeUnit unit) - throws FunctionException, InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public void addResult(DistributedMember memberID, - Object resultOfSingleExecution) { - if (resultOfSingleExecution instanceof CompactionStatus) { - CompactionStatus status = (CompactionStatus) resultOfSingleExecution; - if (status.getBucketId() != HDFSForceCompactionFunction.BUCKET_ID_FOR_LAST_RESULT) { - reply.add(status); - } - } - } - - @Override - public void endResults() { - waitForResults.countDown(); - } - - @Override - public void clearResults() { - this.shouldRetry = true; - waitForResults.countDown(); - } - - /** - * @return true if retry should be attempted - */ - public boolean shouldRetry() { - return this.shouldRetry || !getFailedBucketIds().isEmpty(); - } - - private Set<Integer> getFailedBucketIds() { - Set<Integer> result = new HashSet<Integer>(); - for (CompactionStatus status : reply) { - if (!status.isStatus()) { - result.add(status.getBucketId()); - } - } - return result; - } - - public Set<Integer> getSuccessfulBucketIds() { - Set<Integer> result = new HashSet<Integer>(); - for (CompactionStatus status : reply) { - if (status.isStatus()) { - result.add(status.getBucketId()); - } - } - return result; - } - - @Override - public void setProcessor(ReplyProcessor21 processor) { - this.processor = processor; - } - - @Override - public ReplyProcessor21 getProcessor() { - return this.processor; - } - -@Override -public void setException(Throwable exception) { - // TODO Auto-generated method stub - -} -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java deleted file mode 100644 index 789fe4d..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import com.gemstone.gemfire.cache.execute.FunctionAdapter; -import com.gemstone.gemfire.cache.execute.FunctionContext; -import com.gemstone.gemfire.cache.execute.RegionFunctionContext; -import com.gemstone.gemfire.internal.InternalEntity; -import com.gemstone.gemfire.internal.cache.PartitionedRegion; - -/** - * Function that returns the oldest timestamp among all the major - * compacted buckets on the members - * - */ -@SuppressWarnings("serial") -public class HDFSLastCompactionTimeFunction extends FunctionAdapter implements InternalEntity{ - - public static final String ID = "HDFSLastCompactionTimeFunction"; - - @Override - public void execute(FunctionContext context) { - RegionFunctionContext rfc = (RegionFunctionContext) context; - PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet(); - rfc.getResultSender().lastResult(pr.lastLocalMajorHDFSCompaction()); - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public boolean optimizeForWrite() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java deleted file mode 100644 index 6d70dce..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java +++ /dev/null @@ -1,480 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.gemstone.gemfire.StatisticsFactory; -import com.gemstone.gemfire.cache.GemFireCache; -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.i18n.LogWriterI18n; -import com.gemstone.gemfire.internal.SystemTimer; -import com.gemstone.gemfire.internal.cache.LocalRegion; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -import org.apache.logging.log4j.Logger; - -/** - * Cache for hoplog organizers associated with buckets of a region. The director creates an - * instance of organizer on first get request. It does not read HDFS in advance. Creation of - * organizer depends on File system initialization that takes outside this class. This class also - * provides utility methods to monitor usage and manage bucket sets. - * - */ -public class HDFSRegionDirector { - /* - * Maps each region name to its listener and store objects. This map must be populated before file - * organizers of a bucket can be created - */ - private final ConcurrentHashMap<String, HdfsRegionManager> regionManagerMap; - - /** - * regions of this Gemfire cache are managed by this director. TODO this - * should be final and be provided at the time of creation of this instance or - * through a cache directory - */ - private GemFireCache cache; - - // singleton instance - private static HDFSRegionDirector instance; - - final ScheduledExecutorService janitor; - private JanitorTask janitorTask; - - private static final Logger logger = LogService.getLogger(); - protected final static String logPrefix = "<" + "RegionDirector" + "> "; - - - private HDFSRegionDirector() { - regionManagerMap = new ConcurrentHashMap<String, HDFSRegionDirector.HdfsRegionManager>(); - janitor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "HDFSRegionJanitor"); - thread.setDaemon(true); - return thread; - } - }); - - long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS, - HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT); - - janitorTask = new JanitorTask(); - janitor.scheduleWithFixedDelay(janitorTask, interval, interval, - TimeUnit.SECONDS); - } - - public synchronized static HDFSRegionDirector getInstance() { - if (instance == null) { - instance = new HDFSRegionDirector(); - } - return instance; - } - - public HDFSRegionDirector setCache(GemFireCache cache) { - this.cache = cache; - return this; - } - - public GemFireCache getCache() { - return this.cache; - } - /** - * Caches listener, store object and list of organizers associated with the region associated with - * a region. Subsequently, these objects will be used each time an organizer is created - */ - public synchronized HdfsRegionManager manageRegion(LocalRegion region, String storeName, - HoplogListener listener) { - - HdfsRegionManager manager = regionManagerMap.get(region.getFullPath()); - if (manager != null) { - // this is an attempt to re-register a region. Assuming this was required - // to modify listener or hdfs store impl associated with the region. Hence - // will clear the region first. - - clear(region.getFullPath()); - } - - HDFSStoreImpl store = HDFSStoreDirector.getInstance().getHDFSStore(storeName); - manager = new HdfsRegionManager(region, store, listener, getStatsFactory(), this); - regionManagerMap.put(region.getFullPath(), manager); - - if (logger.isDebugEnabled()) { - logger.debug("{}Now managing region " + region.getFullPath(), logPrefix); - } - - return manager; - } - - /** - * Find the regions that are part of a particular HDFS store. - */ - public Collection<String> getRegionsInStore(HDFSStore store) { - TreeSet<String> regions = new TreeSet<String>(); - for(Map.Entry<String, HdfsRegionManager> entry : regionManagerMap.entrySet()) { - if(entry.getValue().getStore().equals(store)) { - regions.add(entry.getKey()); - } - } - return regions; - } - - public int getBucketCount(String regionPath) { - HdfsRegionManager manager = regionManagerMap.get(regionPath); - if (manager == null) { - throw new IllegalStateException("Region not initialized"); - } - - return manager.bucketOrganizerMap.size(); - } - - public void closeWritersForRegion(String regionPath, int minSizeForFileRollover) throws IOException { - regionManagerMap.get(regionPath).closeWriters(minSizeForFileRollover); - } - /** - * removes and closes all {@link HoplogOrganizer} of this region. This call is expected with - * a PR disowns a region. - */ - public synchronized void clear(String regionPath) { - HdfsRegionManager manager = regionManagerMap.remove(regionPath); - if (manager != null) { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing hoplog region manager for " + regionPath, logPrefix); - } - manager.close(); - } - } - - /** - * Closes all region managers, organizers and hoplogs. This method should be - * called before closing the cache to gracefully release all resources - */ - public static synchronized void reset() { - if (instance == null) { - // nothing to reset - return; - } - - instance.janitor.shutdownNow(); - - for (String region : instance.regionManagerMap.keySet()) { - instance.clear(region); - } - instance.cache = null; - instance = null; - } - - /** - * Terminates current janitor task and schedules a new. The rate of the new - * task is based on the value of system property at that time - */ - public static synchronized void resetJanitor() { - instance.janitorTask.terminate(); - instance.janitorTask = instance.new JanitorTask(); - long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS, - HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT); - instance.janitor.scheduleWithFixedDelay(instance.janitorTask, 0, interval, - TimeUnit.SECONDS); - } - - /** - * @param regionPath name of region for which stats object is desired - * @return {@link SortedOplogStatistics} instance associated with hdfs region - * name. Null if region is not managed by director - */ - public synchronized SortedOplogStatistics getHdfsRegionStats(String regionPath) { - HdfsRegionManager manager = regionManagerMap.get(regionPath); - return manager == null ? null : manager.getHdfsStats(); - } - - private StatisticsFactory getStatsFactory() { - return cache.getDistributedSystem(); - } - - /** - * A helper class to manage region and its organizers - */ - public static class HdfsRegionManager { - // name and store configuration of the region whose buckets are managed by this director. - private LocalRegion region; - private HDFSStoreImpl store; - private HoplogListener listener; - private volatile boolean closed = false; - private final int FILE_ROLLOVER_TASK_INTERVAL = Integer.parseInt - (System.getProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "60")); - - private SystemTimer hoplogCloseTimer = null; - - // instance of hdfs statistics object for this hdfs based region. This - // object will collect usage and performance related statistics. - private final SortedOplogStatistics hdfsStats; - - /* - * An instance of organizer is created for each bucket of regionName region residing on this - * node. This member maps bucket id with its corresponding organizer instance. A lock is used to - * manage concurrent writes to the map. - */ - private ConcurrentMap<Integer, HoplogOrganizer> bucketOrganizerMap; - - private HDFSRegionDirector hdfsRegionDirector; - - /** - * @param listener - * listener of change events like file creation and deletion - * @param hdfsRegionDirector - */ - HdfsRegionManager(LocalRegion region, HDFSStoreImpl store, - HoplogListener listener, StatisticsFactory statsFactory, HDFSRegionDirector hdfsRegionDirector) { - bucketOrganizerMap = new ConcurrentHashMap<Integer, HoplogOrganizer>(); - this.region = region; - this.listener = listener; - this.store = store; - this.hdfsStats = new SortedOplogStatistics(statsFactory, "HDFSRegionStatistics", region.getFullPath()); - this.hdfsRegionDirector = hdfsRegionDirector; - } - - public void closeWriters(int minSizeForFileRollover) throws IOException { - final long startTime = System.currentTimeMillis(); - long elapsedTime = 0; - - Collection<HoplogOrganizer> organizers = bucketOrganizerMap.values(); - - for (HoplogOrganizer organizer : organizers) { - - try { - this.getRegion().checkReadiness(); - } catch (Exception e) { - break; - } - - ((HDFSUnsortedHoplogOrganizer)organizer).synchronizedCloseWriter(true, 0, - minSizeForFileRollover); - } - - } - - public synchronized <T extends PersistedEventImpl> HoplogOrganizer<T> create(int bucketId) throws IOException { - assert !bucketOrganizerMap.containsKey(bucketId); - - HoplogOrganizer<?> organizer = region.getHDFSWriteOnly() - ? new HDFSUnsortedHoplogOrganizer(this, bucketId) - : new HdfsSortedOplogOrganizer(this, bucketId); - - bucketOrganizerMap.put(bucketId, organizer); - // initialize a timer that periodically closes the hoplog writer if the - // time for rollover has passed. It also has the responsibility to fix the files. - if (this.region.getHDFSWriteOnly() && - hoplogCloseTimer == null) { - hoplogCloseTimer = new SystemTimer(hdfsRegionDirector. - getCache().getDistributedSystem(), true); - - // schedule the task to fix the files that were not closed properly - // last time. - hoplogCloseTimer.scheduleAtFixedRate(new CloseTmpHoplogsTimerTask(this), - 1000, FILE_ROLLOVER_TASK_INTERVAL * 1000); - - if (logger.isDebugEnabled()) { - logger.debug("{}Schedulng hoplog rollover timer with interval "+ FILE_ROLLOVER_TASK_INTERVAL + - " for hoplog organizer for " + region.getFullPath() - + ":" + bucketId + " " + organizer, logPrefix); - } - } - - if (logger.isDebugEnabled()) { - logger.debug("{}Constructed hoplog organizer for " + region.getFullPath() - + ":" + bucketId + " " + organizer, logPrefix); - } - return (HoplogOrganizer<T>) organizer; - } - - public synchronized <T extends PersistedEventImpl> void addOrganizer( - int bucketId, HoplogOrganizer<T> organizer) { - if (bucketOrganizerMap.containsKey(bucketId)) { - throw new IllegalArgumentException(); - } - if (logger.isDebugEnabled()) { - logger.debug("{}added pre constructed organizer " + region.getFullPath() - + ":" + bucketId + " " + organizer, logPrefix); - } - bucketOrganizerMap.put(bucketId, organizer); - } - - public void close() { - closed = true; - - if (this.region.getHDFSWriteOnly() && - hoplogCloseTimer != null) { - hoplogCloseTimer.cancel(); - hoplogCloseTimer = null; - } - for (int bucket : bucketOrganizerMap.keySet()) { - close(bucket); - } - } - - public boolean isClosed() { - return closed; - } - - public synchronized void close(int bucketId) { - try { - HoplogOrganizer organizer = bucketOrganizerMap.remove(bucketId); - if (organizer != null) { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing hoplog organizer for " + region.getFullPath() + ":" + - bucketId + " " + organizer, logPrefix); - } - organizer.close(); - } - } catch (IOException e) { - if (logger.isDebugEnabled()) { - logger.debug(logPrefix + "Error closing hoplog organizer for " + region.getFullPath() + ":" + bucketId, e); - } - } - //TODO abort compaction and flush requests for this region - } - - public static String getRegionFolder(String regionPath) { - String folder = regionPath; - //Change any underscore into a double underscore - folder = folder.replace("_", "__"); - //get rid of the leading slash - folder = folder.replaceFirst("^/", ""); - //replace slashes with underscores - folder = folder.replace('/', '_'); - return folder; - } - - public String getRegionFolder() { - return getRegionFolder(region.getFullPath()); - } - - public HoplogListener getListener() { - return listener; - } - - public HDFSStoreImpl getStore() { - return store; - } - - public LocalRegion getRegion() { - return region; - } - - public SortedOplogStatistics getHdfsStats() { - return hdfsStats; - } - - public Collection<HoplogOrganizer> getBucketOrganizers(){ - return this.bucketOrganizerMap.values(); - } - - /** - * get the HoplogOrganizers only for the given set of buckets - */ - public Collection<HoplogOrganizer> getBucketOrganizers(Set<Integer> buckets){ - Set<HoplogOrganizer> result = new HashSet<HoplogOrganizer>(); - for (Integer bucketId : buckets) { - result.add(this.bucketOrganizerMap.get(bucketId)); - } - return result; - } - - /** - * Delete all files from HDFS for this region. This method - * should be called after all members have destroyed their - * region in gemfire, so there should be no threads accessing - * these files. - * @throws IOException - */ - public void destroyData() throws IOException { - //Make sure everything is shut down and closed. - close(); - if (store == null) { - return; - } - Path regionPath = new Path(store.getHomeDir(), getRegionFolder()); - - //Delete all files in HDFS. - FileSystem fs = getStore().getFileSystem(); - if(!fs.delete(regionPath, true)) { - if(fs.exists(regionPath)) { - throw new IOException("Unable to delete " + regionPath); - } - } - } - - public void performMaintenance() throws IOException { - Collection<HoplogOrganizer> buckets = getBucketOrganizers(); - for (HoplogOrganizer bucket : buckets) { - bucket.performMaintenance(); - } - } - } - - private class JanitorTask implements Runnable { - boolean terminated = false; - @Override - public void run() { - if (terminated) { - return; - } - fineLog("Executing HDFS Region janitor task", null); - - Collection<HdfsRegionManager> regions = regionManagerMap.values(); - for (HdfsRegionManager region : regions) { - fineLog("Maintaining region:" + region.getRegionFolder(), null); - try { - region.performMaintenance(); - } catch (Throwable e) { - logger.info(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR , region.getRegionFolder())); - logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, e.getMessage())); - fineLog(null, e); - } - } - } - - public void terminate() { - terminated = true; - } - } - - protected static void fineLog(String message, Throwable e) { - if(logger.isDebugEnabled()) { - logger.debug(message, e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java deleted file mode 100644 index 880ef3e..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; - - -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; - -/** - * HDFSStoreDirector is created for managing all instances of HDFSStoreImpl. - * - */ -public final class HDFSStoreDirector { - private final ConcurrentHashMap<String, HDFSStoreImpl> storeMap = new ConcurrentHashMap<String, HDFSStoreImpl>(); - - // singleton instance - private static volatile HDFSStoreDirector instance; - - private HDFSStoreDirector() { - - } - - public static final HDFSStoreDirector getInstance() { - if (instance == null) { - synchronized (HDFSStoreDirector.class) { - if (instance == null) - instance = new HDFSStoreDirector(); - } - } - return instance; - } - - // Called when the region is created. - public final void addHDFSStore(HDFSStoreImpl hdfsStore){ - this.storeMap.put(hdfsStore.getName(), hdfsStore); - } - - public final HDFSStoreImpl getHDFSStore(String hdfsStoreName) { - return this.storeMap.get(hdfsStoreName); - } - - public final void removeHDFSStore(String hdfsStoreName) { - this.storeMap.remove(hdfsStoreName); - } - - public void closeHDFSStores() { - Iterator<HDFSStoreImpl> it = this.storeMap.values().iterator(); - while (it.hasNext()) { - HDFSStoreImpl hsi = it.next(); - hsi.close(); - } - this.storeMap.clear(); - } - - public ArrayList<HDFSStoreImpl> getAllHDFSStores() { - ArrayList<HDFSStoreImpl> hdfsStores = new ArrayList<HDFSStoreImpl>(); - hdfsStores.addAll(this.storeMap.values()); - return hdfsStores; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java deleted file mode 100644 index cbb35cb..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java +++ /dev/null @@ -1,447 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; - -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent; -import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter; -import com.gemstone.gemfire.internal.HeapDataOutputStream; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import org.apache.hadoop.hbase.util.FSUtils; - -/** - * Manages unsorted Hoplog files for a bucket (Streaming Ingest option). An instance per bucket - * will exist in each PR - * - * - */ -public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<UnsortedHoplogPersistedEvent> { - public static final String HOPLOG_REGEX = HOPLOG_NAME_REGEX + "(" - + SEQ_HOPLOG_EXTENSION + "|" + TEMP_HOPLOG_EXTENSION + ")"; - public static final Pattern HOPLOG_PATTERN = Pattern.compile(HOPLOG_REGEX); - protected static String TMP_FILE_NAME_REGEX = HOPLOG_NAME_REGEX + SEQ_HOPLOG_EXTENSION + TEMP_HOPLOG_EXTENSION + "$"; - protected static final Pattern patternForTmpHoplog = Pattern.compile(TMP_FILE_NAME_REGEX); - - volatile private HoplogWriter writer; - volatile private Hoplog currentHoplog; - - volatile private long lastFlushTime = System.currentTimeMillis(); - - volatile private boolean abortFlush = false; - private FileSystem fileSystem; - - public HDFSUnsortedHoplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{ - super(region, bucketId); - writer = null; - sequence = new AtomicInteger(0); - - fileSystem = store.getFileSystem(); - if (! fileSystem.exists(bucketPath)) { - return; - } - - FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() { - @Override - public boolean accept(Path file) { - // All valid hoplog files must match the regex - Matcher matcher = HOPLOG_PATTERN.matcher(file.getName()); - return matcher.matches(); - } - }); - - if (validHoplogs != null && validHoplogs.length > 0) { - for (FileStatus file : validHoplogs) { - // account for the disk used by this file - incrementDiskUsage(file.getLen()); - } - } - - } - - @Override - public void close() throws IOException { - super.close(); - if (logger.isDebugEnabled()) - logger.debug("{}Closing the hoplog organizer and the open files", logPrefix); - // abort the flush so that we can immediately call the close current writer. - abortFlush = true; - synchronizedCloseWriter(true, 0, 0); - } - - - /** - * Flushes the data to HDFS. - * Synchronization ensures that the writer is not closed when flush is happening. - * To abort the flush, abortFlush needs to be set. - * @throws ForceReattemptException - */ - @Override - public synchronized void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, final int count) - throws IOException, ForceReattemptException { - assert bufferIter != null; - - if (abortFlush) - throw new CacheClosedException("Either the region has been cleared " + - "or closed. Aborting the ongoing flush operation."); - if (logger.isDebugEnabled()) - logger.debug("{}Initializing flush operation", logPrefix); - - // variables for updating stats - long start = stats.getFlush().begin(); - int byteCount = 0; - if (writer == null) { - // Hoplogs of sequence files are always created with a 0 sequence number - currentHoplog = getTmpSortedOplog(0, SEQ_HOPLOG_EXTENSION); - try { - writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() { - @Override - public HoplogWriter call() throws Exception { - return currentHoplog.createWriter(count); - } - }); - } catch (Exception e) { - if (e instanceof IOException) { - throw (IOException)e; - } - throw new IOException(e); - } - } - long timeSinceLastFlush = (System.currentTimeMillis() - lastFlushTime)/1000 ; - - try { - /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */ - //HeapDataOutputStream out = new HeapDataOutputStream(); - while (bufferIter.hasNext()) { - HeapDataOutputStream out = new HeapDataOutputStream(1024, null); - if (abortFlush) { - stats.getFlush().end(byteCount, start); - throw new CacheClosedException("Either the region has been cleared " + - "or closed. Aborting the ongoing flush operation."); - } - QueuedPersistentEvent item = bufferIter.next(); - item.toHoplogEventBytes(out); - byte[] valueBytes = out.toByteArray(); - writer.append(item.getRawKey(), valueBytes); - // add key length and value length to stats byte counter - byteCount += (item.getRawKey().length + valueBytes.length); - /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/ - //out.clearForReuse(); - } - // ping secondaries before making the file a legitimate file to ensure - // that in case of split brain, no other vm has taken up as primary. #50110. - if (!abortFlush) - pingSecondaries(); - // append completed. If the file is to be rolled over, - // close writer and rename the file to a legitimate name. - // Else, sync the already written data with HDFS nodes. - int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024; - int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); - if (writer.getCurrentSize() >= maxFileSize || - timeSinceLastFlush >= fileRolloverInterval) { - closeCurrentWriter(); - } - else { - // if flush is not aborted, hsync the batch. It ensures that - // the batch has reached HDFS and we can discard it. - if (!abortFlush) - writer.hsync(); - } - } catch (IOException e) { - stats.getFlush().error(start); - // as there is an exception, it can be probably be a file specific problem. - // close the current file to avoid any file specific issues next time - closeCurrentWriter(); - // throw the exception so that async queue will dispatch the same batch again - throw e; - } - - stats.getFlush().end(byteCount, start); - } - - /** - * Synchronization ensures that the writer is not closed when flush is happening. - */ - synchronized void synchronizedCloseWriter(boolean forceClose, - long timeSinceLastFlush, int minsizeforrollover) throws IOException { - long writerSize = 0; - if (writer != null){ - writerSize = writer.getCurrentSize(); - } - - if (writerSize < (minsizeforrollover * 1024L)) - return; - - int maxFileSize = this.store.getWriteOnlyFileRolloverSize() * 1024 * 1024; - int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); - if (writerSize >= maxFileSize || - timeSinceLastFlush >= fileRolloverInterval || forceClose) { - closeCurrentWriter(); - } - } - - - /** - * Closes the current writer so that next time a new hoplog can - * be created. Also, fixes any tmp hoplogs. - * - * @throws IOException - */ - void closeCurrentWriter() throws IOException { - - if (writer != null) { - // If this organizer is closing, it is ok to ignore exceptions here - // because CloseTmpHoplogsTimerTask - // on another member may have already renamed the hoplog - // fixes bug 49141 - boolean isClosing = abortFlush; - try { - incrementDiskUsage(writer.getCurrentSize()); - } catch (IOException e) { - if (!isClosing) { - throw e; - } - } - if (logger.isDebugEnabled()) - logger.debug("{}Closing hoplog " + currentHoplog.getFileName(), logPrefix); - try{ - writer.close(); - makeLegitimate(currentHoplog); - } catch (IOException e) { - if (!isClosing) { - logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e); - throw e; - } - } finally { - writer = null; - lastFlushTime = System.currentTimeMillis(); - } - } - else - lastFlushTime = System.currentTimeMillis(); - } - - @Override - public void clear() throws IOException { - boolean prevAbortFlushFlag = abortFlush; - // abort the flush so that we can immediately call the close current writer. - abortFlush = true; - - // Close if there is any existing writer. - try { - synchronizedCloseWriter(true, 0, 0); - } catch (IOException e) { - logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e); - } - - // reenable the aborted flush - abortFlush = prevAbortFlushFlag; - - // Mark the hoplogs for deletion - markHoplogsForDeletion(); - - } - - @Override - public void performMaintenance() { - // TODO remove the timer for tmp file conversion. Use this instead - } - - @Override - public Future<CompactionStatus> forceCompaction(boolean isMajor) { - return null; - } - - @Override - protected Hoplog getHoplog(Path hoplogPath) throws IOException { - Hoplog so = new SequenceFileHoplog(fileSystem, hoplogPath, stats); - return so; - } - - /** - * Fixes the size of hoplogs that were not closed properly last time. - * Such hoplogs are *.tmphop files. Identify them and open them and close - * them, this fixes the size. After doing this rename them to *.hop. - * - * @throws IOException - * @throws ForceReattemptException - */ - void identifyAndFixTmpHoplogs(FileSystem fs) throws IOException, ForceReattemptException { - if (logger.isDebugEnabled()) - logger.debug("{}Fixing temporary hoplogs", logPrefix); - - // A different filesystem is passed to this function for the following reason: - // For HDFS, if a file wasn't closed properly last time, - // while calling FileSystem.append for this file, FSNamesystem.startFileInternal-> - // FSNamesystem.recoverLeaseInternal function gets called. - // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, - // created using the same FileSystem object. This is a bug and is being tracked at: - // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel - // - // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, - // we create a new file system for the timer task so that it does not encounter the bug. - - FileStatus tmpHoplogs[] = FSUtils.listStatus(fs, fs.makeQualified(bucketPath), new PathFilter() { - @Override - public boolean accept(Path file) { - // All valid hoplog files must match the regex - Matcher matcher = patternForTmpHoplog.matcher(file.getName()); - return matcher.matches(); - } - }); - - if (tmpHoplogs == null || tmpHoplogs.length == 0) { - if (logger.isDebugEnabled()) - logger.debug("{}No files to fix", logPrefix); - return; - } - // ping secondaries so that in case of split brain, no other vm has taken up - // as primary. #50110. - pingSecondaries(); - if (logger.isDebugEnabled()) - logger.debug("{}Files to fix " + tmpHoplogs.length, logPrefix); - - String currentHoplogName = null; - // get the current hoplog name. We need to ignore current hoplog while fixing. - if (currentHoplog != null) { - currentHoplogName = currentHoplog.getFileName(); - } - - for (int i = 0; i < tmpHoplogs.length; i++) { - // Skip directories - if (tmpHoplogs[i].isDirectory()) { - continue; - } - - final Path p = tmpHoplogs[i].getPath(); - - if (tmpHoplogs[i].getPath().getName().equals(currentHoplogName)){ - if (logger.isDebugEnabled()) - logger.debug("Skipping current file: " + tmpHoplogs[i].getPath().getName(), logPrefix); - continue; - } - - SequenceFileHoplog hoplog = new SequenceFileHoplog(fs, p, stats); - try { - makeLegitimate(hoplog); - logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " was a temporary " + - "hoplog because the node managing it wasn't shutdown properly last time. Fixed the hoplog name.")); - } catch (IOException e) { - logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " is still a temporary " + - "hoplog because the node managing it wasn't shutdown properly last time. Failed to " + - "change the hoplog name because an exception was thrown while fixing it. " + e)); - } - } - } - - private FileStatus[] getExpiredHoplogs() throws IOException { - FileStatus files[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() { - @Override - public boolean accept(Path file) { - // All expired hoplog end with expire extension and must match the valid file regex - String fileName = file.getName(); - if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) { - return false; - } - return true; - } - }); - return files; - } - /** - * locks sorted oplogs collection, removes oplog and renames for deletion later - * @throws IOException - */ - private void markHoplogsForDeletion() throws IOException { - - ArrayList<IOException> errors = new ArrayList<IOException>(); - FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() { - @Override - public boolean accept(Path file) { - // All valid hoplog files must match the regex - Matcher matcher = HOPLOG_PATTERN.matcher(file.getName()); - return matcher.matches(); - } - }); - - FileStatus[] expired = getExpiredHoplogs(); - validHoplogs = filterValidHoplogs(validHoplogs, expired); - - if (validHoplogs == null || validHoplogs.length == 0) { - return; - } - for (FileStatus fileStatus : validHoplogs) { - try { - addExpiryMarkerForAFile(getHoplog(fileStatus.getPath())); - } catch (IOException e) { - // even if there is an IO error continue removing other hoplogs and - // notify at the end - errors.add(e); - } - } - - if (!errors.isEmpty()) { - for (IOException e : errors) { - logger.warn(LocalizedStrings.HOPLOG_HOPLOG_REMOVE_FAILED, e); - } - } - } - - @Override - public Compactor getCompactor() { - throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName()); - } - - @Override - public HoplogIterator<byte[], UnsortedHoplogPersistedEvent> scan( - long startOffset, long length) throws IOException { - throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName()); - } - - public long getLastFlushTime() { - return this.lastFlushTime; - } - - public long getfileRolloverInterval(){ - int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); - return fileRolloverInterval; - } - - @Override - public long getLastMajorCompactionTimestamp() { - throw new UnsupportedOperationException(); - } - -}