http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java deleted file mode 100644 index 52470d0..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/DelegatingSerializedComparator.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; - -/** - * Delegates object comparisons to one or more embedded comparators. - * - */ -public interface DelegatingSerializedComparator extends SerializedComparator { - /** - * Injects the embedded comparators. - * @param comparators the comparators for delegation - */ - void setComparators(SerializedComparator[] comparators); - - /** - * Returns the embedded comparators. - * @return the comparators - */ - SerializedComparator[] getComparators(); -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java deleted file mode 100644 index fdf3852..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/HFileStoreStatistics.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime; - -import com.gemstone.gemfire.StatisticDescriptor; -import com.gemstone.gemfire.Statistics; -import com.gemstone.gemfire.StatisticsFactory; -import com.gemstone.gemfire.StatisticsType; -import com.gemstone.gemfire.StatisticsTypeFactory; -import com.gemstone.gemfire.internal.DummyStatisticsFactory; -import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl; - -public class HFileStoreStatistics { - private final Statistics stats; - - private final CacheOperation blockCache; - - public HFileStoreStatistics(String typeName, String name) { - this(new DummyStatisticsFactory(), typeName, name); - } - - public HFileStoreStatistics(StatisticsFactory factory, String typeName, String name) { - StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton(); - - StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses"); - StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits"); - StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks"); - StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes"); - StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes"); - - - StatisticsType type = tf.createType(typeName, - "Statistics about structured I/O operations for a region", new StatisticDescriptor[] { - bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted - }); - - blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId()); - - - stats = factory.createAtomicStatistics(type, name); - } - - public void close() { - stats.close(); - } - - public Statistics getStats() { - return stats; - } - - public CacheOperation getBlockCache() { - return blockCache; - } - - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("blockCache = {").append(blockCache).append("}\n"); - - return sb.toString(); - } - - public class TimedOperation { - protected final int countId; - protected final int inProgressId; - protected final int timeId; - private final int errorsId; - - public TimedOperation(int count, int inProgress, int time, int errors) { - this.countId = count; - this.inProgressId = inProgress; - this.timeId = time; - this.errorsId = errors; - } - - public long begin() { - stats.incLong(inProgressId, 1); - return getStatTime(); - } - - public long end(long start) { - stats.incLong(inProgressId, -1); - stats.incLong(countId, 1); - stats.incLong(timeId, getStatTime() - start); - return getStatTime(); - } - - public void error(long start) { - end(start); - stats.incLong(errorsId, 1); - } - - public long getCount() { - return stats.getLong(countId); - } - - public long getInProgress() { - return stats.getLong(inProgressId); - } - - public long getTime() { - return stats.getLong(timeId); - } - - public long getErrors() { - return stats.getLong(errorsId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("count=").append(getCount()); - sb.append(";inProgress=").append(getInProgress()); - sb.append(";errors=").append(getErrors()); - sb.append(";time=").append(getTime()); - - return sb.toString(); - } - } - - public class CacheOperation { - private final int missesId; - private final int hitsId; - private final int cachedId; - private final int bytesCachedId; - private final int bytesEvictedId; - - public CacheOperation(int missesId, int hitsId, int cachedId, - int bytesCachedId, int bytesEvictedId) { - this.missesId = missesId; - this.hitsId = hitsId; - this.cachedId = cachedId; - this.bytesCachedId = bytesCachedId; - this.bytesEvictedId = bytesEvictedId; - } - - public void store(long bytes) { - stats.incLong(cachedId, 1); - stats.incLong(bytesCachedId, bytes); - } - - public void evict(long bytes) { - stats.incLong(cachedId, -1); - stats.incLong(bytesCachedId, -bytes); - stats.incLong(bytesEvictedId, bytes); - } - - public void hit() { - stats.incLong(hitsId, 1); - } - - public void miss() { - stats.incLong(missesId, 1); - } - - public long getMisses() { - return stats.getLong(missesId); - } - - public long getHits() { - return stats.getLong(hitsId); - } - - public long getCached() { - return stats.getLong(cachedId); - } - - public long getBytesCached() { - return stats.getLong(bytesCachedId); - } - - public long getBytesEvicted() { - return stats.getLong(bytesEvictedId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("misses=").append(getMisses()); - sb.append(";hits=").append(getHits()); - sb.append(";cached=").append(getCached()); - sb.append(";bytesCached=").append(getBytesCached()); - sb.append(";bytesEvicted=").append(getBytesEvicted()); - - return sb.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java deleted file mode 100644 index df7e1ac..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/KeyValueIterator.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.util.Iterator; - -/** - * Provides an {@link Iterator} view over a collection of keys and values. The - * implementor must provide access to the current key/value as well as a means - * to move to the next pair. - * - * - * @param <K> the key type - * @param <V> the value type - */ -public interface KeyValueIterator<K, V> extends Iterator<K> { - /** - * Returns the key at the current position. - * @return the key - */ - public K key(); - - /** - * Returns the value at the current position. - * @return the value - */ - public abstract V value(); -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java deleted file mode 100644 index 35baafb..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogStatistics.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import static com.gemstone.gemfire.distributed.internal.DistributionStats.getStatTime; - -import com.gemstone.gemfire.StatisticDescriptor; -import com.gemstone.gemfire.Statistics; -import com.gemstone.gemfire.StatisticsFactory; -import com.gemstone.gemfire.StatisticsType; -import com.gemstone.gemfire.StatisticsTypeFactory; -import com.gemstone.gemfire.internal.DummyStatisticsFactory; -import com.gemstone.gemfire.internal.StatisticsTypeFactoryImpl; - -public class SortedOplogStatistics { - private final Statistics stats; - - private final IOOperation read; - private final ScanOperation scan; - private final IOOperation write; - private final IOOperation put; - private final IOOperation flush; - private final IOOperation minorCompaction; - private final IOOperation majorCompaction; - private final BloomOperation bloom; - private final TimedOperation clear; - private final TimedOperation destroy; - - private final IOOperation blockRead; - private final CacheOperation blockCache; - - private final int activeFilesId; - private final int inactiveFilesId; - private final int activeReadersId; - - private final int storeUsageBytesId; - - public SortedOplogStatistics(String typeName, String name) { - this(new DummyStatisticsFactory(), typeName, name); - } - - public SortedOplogStatistics(StatisticsFactory factory, String typeName, String name) { - StatisticsTypeFactory tf = StatisticsTypeFactoryImpl.singleton(); - - StatisticDescriptor readCount = tf.createLongCounter("reads", "The total number of read operations", "ops"); - StatisticDescriptor readInProgress = tf.createLongGauge("readsInProgress", "The number of read operations in progress", "ops"); - StatisticDescriptor readTime = tf.createLongCounter("readTime", "The total time spent reading from disk", "nanoseconds"); - StatisticDescriptor readBytes = tf.createLongCounter("readBytes", "The total number of bytes read from disk", "bytes"); - StatisticDescriptor readErrors = tf.createLongCounter("readErrors", "The total number of read errors", "errors"); - - StatisticDescriptor scanCount = tf.createLongCounter("scans", "The total number of scan operations", "ops"); - StatisticDescriptor scanInProgress = tf.createLongGauge("scansInProgress", "The number of scan operations in progress", "ops"); - StatisticDescriptor scanTime = tf.createLongCounter("scanTime", "The total time scanner was operational", "nanoseconds"); - StatisticDescriptor scanBytes = tf.createLongCounter("scanBytes", "The total number of bytes scanned from disk", "bytes"); - StatisticDescriptor scanErrors = tf.createLongCounter("scanErrors", "The total number of scan errors", "errors"); - StatisticDescriptor scanIterations = tf.createLongCounter("scanIterations", "The total number of scan iterations", "ops"); - StatisticDescriptor scanIterationTime = tf.createLongCounter("scanIterationTime", "The total time spent scanning from persistence layer", "nanoseconds"); - - StatisticDescriptor writeCount = tf.createLongCounter("writes", "The total number of write operations", "ops"); - StatisticDescriptor writeInProgress = tf.createLongGauge("writesInProgress", "The number of write operations in progress", "ops"); - StatisticDescriptor writeTime = tf.createLongCounter("writeTime", "The total time spent writing to disk", "nanoseconds"); - StatisticDescriptor writeBytes = tf.createLongCounter("writeBytes", "The total number of bytes written to disk", "bytes"); - StatisticDescriptor writeErrors = tf.createLongCounter("writeErrors", "The total number of write errors", "errors"); - - StatisticDescriptor putCount = tf.createLongCounter("puts", "The total number of put operations", "ops"); - StatisticDescriptor putInProgress = tf.createLongGauge("putsInProgress", "The number of put operations in progress", "ops"); - StatisticDescriptor putTime = tf.createLongCounter("putTime", "The total time spent in put calls", "nanoseconds"); - StatisticDescriptor putBytes = tf.createLongCounter("putBytes", "The total number of bytes put", "bytes"); - StatisticDescriptor putErrors = tf.createLongCounter("putErrors", "The total number of put errors", "errors"); - - StatisticDescriptor flushCount = tf.createLongCounter("flushes", "The total number of flush operations", "ops"); - StatisticDescriptor flushInProgress = tf.createLongGauge("flushesInProgress", "The number of flush operations in progress", "ops"); - StatisticDescriptor flushTime = tf.createLongCounter("flushTime", "The total time spent flushing to disk", "nanoseconds"); - StatisticDescriptor flushBytes = tf.createLongCounter("flushBytes", "The total number of bytes flushed to disk", "bytes"); - StatisticDescriptor flushErrors = tf.createLongCounter("flushErrors", "The total number of flush errors", "errors"); - - StatisticDescriptor minorCompactionCount = tf.createLongCounter("minorCompactions", "The total number of minor compaction operations", "ops"); - StatisticDescriptor minorCompactionInProgress = tf.createLongGauge("minorCompactionsInProgress", "The number of minor compaction operations in progress", "ops"); - StatisticDescriptor minorCompactionTime = tf.createLongCounter("minorCompactionTime", "The total time spent in minor compactions", "nanoseconds"); - StatisticDescriptor minorCompactionBytes = tf.createLongCounter("minorCompactionBytes", "The total number of bytes collected during minor compactions", "bytes"); - StatisticDescriptor minorCompactionErrors = tf.createLongCounter("minorCompactionErrors", "The total number of minor compaction errors", "errors"); - - StatisticDescriptor majorCompactionCount = tf.createLongCounter("majorCompactions", "The total number of major compaction operations", "ops"); - StatisticDescriptor majorCompactionInProgress = tf.createLongGauge("majorCompactionsInProgress", "The number of major compaction operations in progress", "ops"); - StatisticDescriptor majorCompactionTime = tf.createLongCounter("majorCompactionTime", "The total time spent in major compactions", "nanoseconds"); - StatisticDescriptor majorCompactionBytes = tf.createLongCounter("majorCompactionBytes", "The total number of bytes collected during major compactions", "bytes"); - StatisticDescriptor majorCompactionErrors = tf.createLongCounter("majorCompactionErrors", "The total number of major compaction errors", "errors"); - - StatisticDescriptor bloomCount = tf.createLongCounter("bloomFilterCheck", "The total number of Bloom Filter checks", "ops"); - StatisticDescriptor bloomInProgress = tf.createLongGauge("bloomFilterChecksInProgress", "The number of Bloom Filter checks in progress", "ops"); - StatisticDescriptor bloomTime = tf.createLongCounter("bloomFilterCheckTime", "The total time spent checking the Bloom Filter", "nanoseconds"); - StatisticDescriptor bloomErrors = tf.createLongCounter("bloomFilterErrors", "The total number of Bloom Filter errors", "errors"); - StatisticDescriptor bloomFalsePositive = tf.createLongCounter("bloomFilterFalsePositives", "The total number of Bloom Filter false positives", "false positives"); - - StatisticDescriptor clearCount = tf.createLongCounter("clears", "The total number of clear operations", "ops"); - StatisticDescriptor clearInProgress = tf.createLongGauge("clearsInProgress", "The number of clear operations in progress", "ops"); - StatisticDescriptor clearTime = tf.createLongCounter("clearTime", "The total time spent in clear operations", "nanoseconds"); - StatisticDescriptor clearErrors = tf.createLongGauge("clearErrors", "The total number of clear errors", "errors"); - - StatisticDescriptor destroyCount = tf.createLongCounter("destroys", "The total number of destroy operations", "ops"); - StatisticDescriptor destroyInProgress = tf.createLongGauge("destroysInProgress", "The number of destroy operations in progress", "ops"); - StatisticDescriptor destroyTime = tf.createLongCounter("destroyTime", "The total time spent in destroy operations", "nanoseconds"); - StatisticDescriptor destroyErrors = tf.createLongGauge("destroyErrors", "The total number of destroy errors", "errors"); - - StatisticDescriptor brCount = tf.createLongCounter("blockReads", "The total number of block read operations", "ops"); - StatisticDescriptor brInProgress = tf.createLongGauge("blockReadsInProgress", "The number of block read operations in progress", "ops"); - StatisticDescriptor brTime = tf.createLongCounter("blockReadTime", "The total time spent reading blocks from disk", "nanoseconds"); - StatisticDescriptor brBytes = tf.createLongCounter("blockReadBytes", "The total number of block bytes read from disk", "bytes"); - StatisticDescriptor brErrors = tf.createLongCounter("blockReadErrors", "The total number of block read errors", "errors"); - - StatisticDescriptor bcMisses = tf.createLongCounter("blockCacheMisses", "The total number of block cache misses", "misses"); - StatisticDescriptor bcHits = tf.createLongCounter("blockCacheHits", "The total number of block cache hits", "hits"); - StatisticDescriptor bcCached = tf.createLongGauge("blocksCached", "The current number of cached blocks", "blocks"); - StatisticDescriptor bcBytesCached = tf.createLongGauge("blockBytesCached", "The current number of bytes cached", "bytes"); - StatisticDescriptor bcBytesEvicted = tf.createLongCounter("blockBytesEvicted", "The total number of bytes cached", "bytes"); - - StatisticDescriptor activeFileCount = tf.createLongGauge("activeFileCount", "The total number of active files", "files"); - StatisticDescriptor inactiveFileCount = tf.createLongGauge("inactiveFileCount", "The total number of inactive files", "files"); - StatisticDescriptor activeReaderCount = tf.createLongGauge("activeReaderCount", "The total number of active file readers", "files"); - - StatisticDescriptor storeUsageBytes = tf.createLongGauge("storeUsageBytes", "The total volume occupied on persistent store", "bytes"); - - StatisticsType type = tf.createType(typeName, - "Statistics about structured I/O operations for a region", new StatisticDescriptor[] { - readCount, readInProgress, readTime, readBytes, readErrors, - scanCount, scanInProgress, scanTime, scanBytes, scanErrors, scanIterations, scanIterationTime, - writeCount, writeInProgress, writeTime, writeBytes, writeErrors, - putCount, putInProgress, putTime, putBytes, putErrors, - flushCount, flushInProgress, flushTime, flushBytes, flushErrors, - minorCompactionCount, minorCompactionInProgress, minorCompactionTime, minorCompactionBytes, minorCompactionErrors, - majorCompactionCount, majorCompactionInProgress, majorCompactionTime, majorCompactionBytes, majorCompactionErrors, - bloomCount, bloomInProgress, bloomTime, bloomErrors, bloomFalsePositive, - clearCount, clearInProgress, clearTime, clearErrors, - destroyCount, destroyInProgress, destroyTime, destroyErrors, - brCount, brInProgress, brTime, brBytes, brErrors, - bcMisses, bcHits, bcCached, bcBytesCached, bcBytesEvicted, - activeFileCount, inactiveFileCount, activeReaderCount, storeUsageBytes - }); - - read = new IOOperation(readCount.getId(), readInProgress.getId(), readTime.getId(), readBytes.getId(), readErrors.getId()); - scan = new ScanOperation(scanCount.getId(), scanInProgress.getId(), scanTime.getId(), scanBytes.getId(), scanErrors.getId(), scanIterations.getId(), scanIterationTime.getId()); - write = new IOOperation(writeCount.getId(), writeInProgress.getId(), writeTime.getId(), writeBytes.getId(), writeErrors.getId()); - put = new IOOperation(putCount.getId(), putInProgress.getId(), putTime.getId(), putBytes.getId(), putErrors.getId()); - flush = new IOOperation(flushCount.getId(), flushInProgress.getId(), flushTime.getId(), flushBytes.getId(), flushErrors.getId()); - minorCompaction = new IOOperation(minorCompactionCount.getId(), minorCompactionInProgress.getId(), minorCompactionTime.getId(), minorCompactionBytes.getId(), minorCompactionErrors.getId()); - majorCompaction = new IOOperation(majorCompactionCount.getId(), majorCompactionInProgress.getId(), majorCompactionTime.getId(), majorCompactionBytes.getId(), majorCompactionErrors.getId()); - bloom = new BloomOperation(bloomCount.getId(), bloomInProgress.getId(), bloomTime.getId(), bloomErrors.getId(), bloomFalsePositive.getId()); - clear = new TimedOperation(clearCount.getId(), clearInProgress.getId(), clearTime.getId(), clearErrors.getId()); - destroy = new TimedOperation(destroyCount.getId(), destroyInProgress.getId(), destroyTime.getId(), destroyErrors.getId()); - - blockRead = new IOOperation(brCount.getId(), brInProgress.getId(), brTime.getId(), brBytes.getId(), brErrors.getId()); - blockCache = new CacheOperation(bcMisses.getId(), bcHits.getId(), bcCached.getId(), bcBytesCached.getId(), bcBytesEvicted.getId()); - - activeFilesId = activeFileCount.getId(); - inactiveFilesId = inactiveFileCount.getId(); - activeReadersId = activeReaderCount.getId(); - storeUsageBytesId = storeUsageBytes.getId(); - - stats = factory.createAtomicStatistics(type, name); - } - - public void close() { - stats.close(); - } - - public Statistics getStats() { - return stats; - } - - public IOOperation getRead() { - return read; - } - - public ScanOperation getScan() { - return scan; - } - - public IOOperation getWrite() { - return write; - } - - public IOOperation getPut() { - return put; - } - - public IOOperation getFlush() { - return flush; - } - - public IOOperation getMinorCompaction() { - return minorCompaction; - } - - public IOOperation getMajorCompaction() { - return majorCompaction; - } - - public BloomOperation getBloom() { - return bloom; - } - - public TimedOperation getClear() { - return clear; - } - - public TimedOperation getDestroy() { - return destroy; - } - - public IOOperation getBlockRead() { - return blockRead; - } - - public CacheOperation getBlockCache() { - return blockCache; - } - - public long getActiveFileCount() { - return stats.getLong(activeFilesId); - } - - public long getInactiveFileCount() { - return stats.getLong(inactiveFilesId); - } - - public long getActiveReaderCount() { - return stats.getLong(activeReadersId); - } - - public void incActiveFiles(int amt) { - stats.incLong(activeFilesId, amt); - assert stats.getLong(activeFilesId) >= 0; - } - - public void incInactiveFiles(int amt) { - stats.incLong(inactiveFilesId, amt); - assert stats.getLong(inactiveFilesId) >= 0; - } - - public void incActiveReaders(int amt) { - stats.incLong(activeReadersId, amt); - assert stats.getLong(activeReadersId) >= 0; - } - - public long getStoreUsageBytes() { - return stats.getLong(storeUsageBytesId); - } - - public void incStoreUsageBytes(long amt) { - stats.incLong(storeUsageBytesId, amt); - assert stats.getLong(storeUsageBytesId) >= 0; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("read = {").append(read).append("}\n"); - sb.append("scan = {").append(scan).append("}\n"); - sb.append("write = {").append(write).append("}\n"); - sb.append("put = {").append(put).append("}\n"); - sb.append("flush = {").append(flush).append("}\n"); - sb.append("minorCompaction = {").append(minorCompaction).append("}\n"); - sb.append("majorCompaction = {").append(majorCompaction).append("}\n"); - sb.append("bloom = {").append(bloom).append("}\n"); - sb.append("clear = {").append(clear).append("}\n"); - sb.append("destroy = {").append(destroy).append("}\n"); - sb.append("blockRead = {").append(blockRead).append("}\n"); - sb.append("blockCache = {").append(blockCache).append("}\n"); - sb.append("activeFiles = ").append(stats.getLong(activeFilesId)).append("\n"); - sb.append("inactiveFiles = ").append(stats.getLong(inactiveFilesId)).append("\n"); - sb.append("activeReaders = ").append(stats.getLong(activeReadersId)).append("\n"); - sb.append("storeUsageBytes = ").append(stats.getLong(storeUsageBytesId)).append("\n"); - - return sb.toString(); - } - - public class TimedOperation { - protected final int countId; - protected final int inProgressId; - protected final int timeId; - private final int errorsId; - - public TimedOperation(int count, int inProgress, int time, int errors) { - this.countId = count; - this.inProgressId = inProgress; - this.timeId = time; - this.errorsId = errors; - } - - public long begin() { - stats.incLong(inProgressId, 1); - return getStatTime(); - } - - public long end(long start) { - stats.incLong(inProgressId, -1); - stats.incLong(countId, 1); - stats.incLong(timeId, getStatTime() - start); - return getStatTime(); - } - - public void error(long start) { - end(start); - stats.incLong(errorsId, 1); - } - - public long getCount() { - return stats.getLong(countId); - } - - public long getInProgress() { - return stats.getLong(inProgressId); - } - - public long getTime() { - return stats.getLong(timeId); - } - - public long getErrors() { - return stats.getLong(errorsId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("count=").append(getCount()); - sb.append(";inProgress=").append(getInProgress()); - sb.append(";errors=").append(getErrors()); - sb.append(";time=").append(getTime()); - - return sb.toString(); - } - } - - public class IOOperation extends TimedOperation { - protected final int bytesId; - - public IOOperation(int count, int inProgress, int time, int bytes, int errors) { - super(count, inProgress, time, errors); - this.bytesId = bytes; - } - - public long end(long bytes, long start) { - stats.incLong(bytesId, bytes); - return super.end(start); - } - - public long getBytes() { - return stats.getLong(bytesId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.append(";bytes=").append(getBytes()); - - return sb.toString(); - } - } - - public class ScanOperation extends IOOperation { - private final int iterationsId; - private final int iterationTimeId; - - public ScanOperation(int count, int inProgress, int time, int bytes, int errors, int iterCount, int iterTime) { - super(count, inProgress, time, bytes, errors); - iterationsId = iterCount; - iterationTimeId = iterTime; - } - - public long beginIteration() { - return getStatTime(); - } - - public void endIteration(long bytes, long start){ - stats.incLong(iterationsId, 1); - stats.incLong(bytesId, bytes); - stats.incLong(iterationTimeId, getStatTime() - start); - } - - public long getIterations() { - return stats.getLong(iterationsId); - } - - public long getIterationTime() { - return stats.getLong(iterationTimeId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.append(";iterations=").append(getIterations()); - sb.append(";iterationTime=").append(getIterationTime()); - - return sb.toString(); - } - } - - public class BloomOperation extends TimedOperation { - private final int falsePositiveId; - - public BloomOperation(int count, int inProgress, int time, int errors, int falsePositive) { - super(count, inProgress, time, errors); - this.falsePositiveId = falsePositive; - } - - public void falsePositive() { - stats.incLong(falsePositiveId, 1); - } - - public long getFalsePositives() { - return stats.getLong(falsePositiveId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(super.toString()); - sb.append(";falsePositives=").append(getFalsePositives()); - - return sb.toString(); - } - } - - public class CacheOperation { - private final int missesId; - private final int hitsId; - private final int cachedId; - private final int bytesCachedId; - private final int bytesEvictedId; - - public CacheOperation(int missesId, int hitsId, int cachedId, - int bytesCachedId, int bytesEvictedId) { - this.missesId = missesId; - this.hitsId = hitsId; - this.cachedId = cachedId; - this.bytesCachedId = bytesCachedId; - this.bytesEvictedId = bytesEvictedId; - } - - public void store(long bytes) { - stats.incLong(cachedId, 1); - stats.incLong(bytesCachedId, bytes); - } - - public void evict(long bytes) { - stats.incLong(cachedId, -1); - stats.incLong(bytesCachedId, -bytes); - stats.incLong(bytesEvictedId, bytes); - } - - public void hit() { - stats.incLong(hitsId, 1); - } - - public void miss() { - stats.incLong(missesId, 1); - } - - public long getMisses() { - return stats.getLong(missesId); - } - - public long getHits() { - return stats.getLong(hitsId); - } - - public long getCached() { - return stats.getLong(cachedId); - } - - public long getBytesCached() { - return stats.getLong(bytesCachedId); - } - - public long getBytesEvicted() { - return stats.getLong(bytesEvictedId); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("misses=").append(getMisses()); - sb.append(";hits=").append(getHits()); - sb.append(";cached=").append(getCached()); - sb.append(";bytesCached=").append(getBytesCached()); - sb.append(";bytesEvicted=").append(getBytesEvicted()); - - return sb.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java deleted file mode 100644 index 1042e22..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedReader.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Comparator; -import org.apache.hadoop.io.RawComparator; - -/** - * Defines a means to read sorted data including performing range scans. - * - * @param <V> type of value returned by the sorted reader - * - */ -public interface SortedReader<V> extends Closeable { - /** - * Defines the names of additional data that may be associated with a sorted - * reader. - */ - public enum Metadata { - /** identifies the disk store associated with the soplog, optional */ - DISK_STORE, - - /** identifies the RVV data, optional */ - RVV; - - /** - * Converts the metadata name to bytes. - * @return the bytes - */ - public byte[] bytes() { - return ("gemfire." + name()).getBytes(); - } - } - - /** - * Filters data based on metadata values. - */ - public interface MetadataFilter { - /** - * Returns the name this filter acts upon. - * @return the name - */ - Metadata getName(); - - /** - * Returns true if the metadata value passes the filter. - * @param value the value to check; may be null if the metadata value does - * not exist or has not been assigned yet - * @return true if accepted - */ - boolean accept(byte[] value); - } - - /** - * Allows comparisons between serialized objects. - */ - public interface SerializedComparator extends RawComparator<byte[]> { - } - - /** - * Allows sorted iteration through a set of keys and values. - */ - public interface SortedIterator<V> extends KeyValueIterator<ByteBuffer, V> { - /** - * Closes the iterator and frees any retained resources. - */ - public abstract void close(); - } - - /** - * Defines the statistics available on a sorted file. - */ - public interface SortedStatistics { - /** - * Returns the number of keys in the file. - * @return the key count - */ - long keyCount(); - - /** - * Returns the first key in the file. - * @return the first key - */ - byte[] firstKey(); - - /** - * Returns the last key in the file. - * @return the last key - */ - byte[] lastKey(); - - /** - * Returns the average key size in bytes. - * @return the average key size - */ - double avgKeySize(); - - /** - * Returns the average value size in bytes. - * @return the average value size - */ - double avgValueSize(); - - /** - * Frees any resources held by for statistics generation. - */ - void close(); - } - - /** - * Returns true if the bloom filter might contain the supplied key. The - * nature of the bloom filter is such that false positives are allowed, but - * false negatives cannot occur. - * - * @param key the key to test - * @return true if the key might be present - * @throws IOException read error - */ - boolean mightContain(byte[] key) throws IOException; - - /** - * Returns the value associated with the given key. - * - * @param key the key - * @return the value, or null if the key is not found - * @throws IOException read error - */ - V read(byte[] key) throws IOException; - - /** - * Iterates from the first key in the file to the requested key. - * @param to the ending key - * @param inclusive true if the ending key is included in the iteration - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> head(byte[] to, boolean inclusive) throws IOException; - - /** - * Iterates from the requested key to the last key in the file. - * @param from the starting key - * @param inclusive true if the starting key should be included in the iteration - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> tail(byte[] from, boolean inclusive) throws IOException; - - /** - * Iterators over the entire contents of the sorted file. - * - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> scan() throws IOException; - - /** - * Scans the available keys and allows iteration over the interval [from, to) - * where the starting key is included and the ending key is excluded from - * the results. - * - * @param from the start key - * @param to the end key - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> scan(byte[] from, byte[] to) throws IOException; - - /** - * Scans the keys and returns an iterator over the interval [equalTo, equalTo]. - * - * @param equalTo the key to match - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> scan(byte[] equalTo) throws IOException; - - /** - * Scans the keys and allows iteration between the given keys. - * - * @param from the start key - * @param fromInclusive true if the start key is included in the scan - * @param to the end key - * @param toInclusive true if the end key is included in the scan - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> scan(byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive) throws IOException; - - /** - * Scans the keys and allows iteration between the given keys after applying - * the metdata filter and the order flag. These parameters override values - * configured using <code>withAscending</code> or <code>withFilter</code>. - * - * @param from the start key - * @param fromInclusive true if the start key is included in the scan - * @param to the end key - * @param toInclusive true if the end key is included in the scan - * @param ascending true if ascending - * @param filter filters data based on metadata values - * @return the sorted iterator - * @throws IOException scan error - */ - SortedIterator<V> scan( - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive, - boolean ascending, - MetadataFilter filter) throws IOException; - - /** - * Changes the iteration order of subsequent operations. - * - * @param ascending true if ascending order (default) - * @return the reader - */ - SortedReader<V> withAscending(boolean ascending); - - /** - * Applies a metadata filter to subsequent operations. - * - * @param filter the filter to apply - * @return the reader - */ - SortedReader<V> withFilter(MetadataFilter filter); - - /** - * Returns the comparator used for sorting keys. - * @return the comparator - */ - SerializedComparator getComparator(); - - /** - * Returns the statistics regarding the keys present in the sorted file. - * @return the statistics - * @throws IOException unable retrieve statistics - */ - SortedStatistics getStatistics() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java deleted file mode 100644 index 2934f07..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/TrackedReference.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Tracks the usage of a reference. - * - * - * @param <T> the reference type - */ -public final class TrackedReference<T> { - /** the referent */ - private final T ref; - - /** the number of uses */ - private final AtomicInteger uses; - - /** list of users using this reference. Mainly for debugging */ - final ConcurrentHashMap<String, AtomicInteger> users; - - /** - * Decrements the use count of each reference. - * @param refs the references to decrement - */ - public static <T> void decrementAll(Iterable<TrackedReference<T>> refs) { - for (TrackedReference<?> tr : refs) { - tr.decrement(); - } - } - - public TrackedReference(T ref) { - this.ref = ref; - uses = new AtomicInteger(0); - users = new ConcurrentHashMap<String, AtomicInteger>(); - } - - /** - * Returns the referent. - * @return the referent - */ - public final T get() { - return ref; - } - - /** - * Returns the current count. - * @return the current uses - */ - public int uses() { - return uses.get(); - } - - /** - * Returns true if the reference is in use. - * @return true if used - */ - public boolean inUse() { - return uses() > 0; - } - - /** - * Increments the use count and returns the reference. - * @return the reference - */ - public T getAndIncrement() { - increment(); - return ref; - } - - /** - * Increments the use counter and returns the current count. - * @return the current uses - */ - public int increment() { - return increment(null); - } - - /** - * Increments the use counter and returns the current count. - * @return the current uses - */ - public int increment(String user) { - int val = uses.incrementAndGet(); - if (user != null) { - AtomicInteger counter = users.get(user); - if (counter == null) { - counter = new AtomicInteger(); - users.putIfAbsent(user, counter); - counter = users.get(user); - } - counter.incrementAndGet(); - } - assert val >= 1; - - return val; - } - - /** - * Decrements the use counter and returns the current count. - * @return the current uses - */ - public int decrement() { - return decrement(null); - } - - /** - * Decrements the use counter and returns the current count. - * @return the current uses - */ - public int decrement(String user) { - int val = uses.decrementAndGet(); - assert val >= 0; - if (user != null) { - AtomicInteger counter = users.get(user); - if (counter != null) { - counter.decrementAndGet(); - } - } - - return val; - } - - @Override - public String toString() { - if (users != null) { - StringBuffer sb = new StringBuffer(); - sb.append(ref.toString()).append(": ").append(uses()); - for (Entry<String, AtomicInteger> user : users.entrySet()) { - sb.append(" ").append(user.getKey()).append(":").append(user.getValue().intValue()); - } - return sb.toString(); - } - return uses() + ": " + ref.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java index e6c07d9..ca7818a 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java @@ -1145,7 +1145,7 @@ public abstract class BaseCommand implements Command { VersionTagHolder versionHolder = new VersionTagHolder(); ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); // From Get70.getValueAndIsObject() - Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true, false); + Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true); VersionTag vt = versionHolder.getVersionTag(); updateValues(values, entryKey, data, vt); @@ -1252,7 +1252,7 @@ public abstract class BaseCommand implements Command { } ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); - data = region.get(key, null, true, true, true, id, versionHolder, true, false); + data = region.get(key, null, true, true, true, id, versionHolder, true); versionTag = versionHolder.getVersionTag(); updateValues(values, key, data, versionTag); @@ -1345,7 +1345,7 @@ public abstract class BaseCommand implements Command { key = it.next(); versionHolder = new VersionTagHolder(); - Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true, false); + Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true); updateValues(values, key, value, versionHolder.getVersionTag()); @@ -1548,7 +1548,7 @@ public abstract class BaseCommand implements Command { ClientProxyMembershipID id = servConn == null ? null : servConn .getProxyID(); data = region.get(key, null, true, true, true, id, versionHolder, - true, false); + true); versionTag = versionHolder.getVersionTag(); updateValues(values, key, data, versionTag); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java index 7898b3c..55047c7 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java @@ -24,7 +24,6 @@ import com.gemstone.gemfire.cache.client.internal.GetOp; import com.gemstone.gemfire.cache.operations.GetOperationContext; import com.gemstone.gemfire.cache.operations.internal.GetOperationContextImpl; import com.gemstone.gemfire.distributed.internal.DistributionStats; -import com.gemstone.gemfire.internal.Assert; import com.gemstone.gemfire.internal.cache.CachedDeserializable; import com.gemstone.gemfire.internal.cache.EntryEventImpl; import com.gemstone.gemfire.internal.cache.LocalRegion; @@ -305,7 +304,7 @@ public class Get70 extends BaseCommand { // } else { ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); VersionTagHolder versionHolder = new VersionTagHolder(); - data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true, true /*allowReadFromHDFS*/); + data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true); // } versionTag = versionHolder.getVersionTag(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java index 2a617a8..69d54a1 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java @@ -242,7 +242,7 @@ public class Request extends BaseCommand { boolean isObject = true; ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); - Object data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false, true/*allowReadFromHDFS*/); + Object data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, null, false); // If the value in the VM is a CachedDeserializable, // get its value. If it is Token.REMOVED, Token.DESTROYED, http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java index e896649..90522b2 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/ClientTXRegionStub.java @@ -67,8 +67,8 @@ public class ClientTXRegionStub implements TXRegionStub { public Object findObject(KeyInfo keyInfo, boolean isCreate, - boolean generateCallbacks, Object value, boolean preferCD, - ClientProxyMembershipID requestingClient, EntryEventImpl event, boolean allowReadFromHDFS) { + boolean generateCallbacks, Object value, boolean preferCD, + ClientProxyMembershipID requestingClient, EntryEventImpl event) { return proxy.get(keyInfo.getKey(), keyInfo.getCallbackArg(), event); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java index 7c7df53..1637c4a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/DistributedTXRegionStub.java @@ -17,12 +17,10 @@ package com.gemstone.gemfire.internal.cache.tx; import java.util.Collections; -import java.util.Map; import com.gemstone.gemfire.cache.CacheException; import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.RemoteTransactionException; import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException; import com.gemstone.gemfire.cache.TransactionDataNotColocatedException; import com.gemstone.gemfire.cache.TransactionException; @@ -32,7 +30,6 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation; import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation; import com.gemstone.gemfire.internal.cache.EntryEventImpl; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.KeyInfo; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegionException; @@ -54,7 +51,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.RemoteSizeMessage; import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock; public class DistributedTXRegionStub extends AbstractPeerTXRegionStub { @@ -159,9 +155,13 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub { } - public Object findObject(KeyInfo keyInfo, boolean isCreate, - boolean generateCallbacks, Object value, boolean preferCD, - ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS) { + public Object findObject(KeyInfo keyInfo, + boolean isCreate, + boolean generateCallbacks, + Object value, + boolean preferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent) { Object retVal = null; final Object key = keyInfo.getKey(); final Object callbackArgument = keyInfo.getCallbackArg(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java index 6723646..01b1ed8 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/PartitionedTXRegionStub.java @@ -275,15 +275,15 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { public Object findObject(KeyInfo keyInfo, boolean isCreate, - boolean generateCallbacks, Object value, boolean peferCD, - ClientProxyMembershipID requestingClient, - EntryEventImpl clientEvent, boolean allowReadFromHDFS) { + boolean generateCallbacks, Object value, boolean peferCD, + ClientProxyMembershipID requestingClient, + EntryEventImpl clientEvent) { Object retVal = null; final Object key = keyInfo.getKey(); final Object callbackArgument = keyInfo.getCallbackArg(); PartitionedRegion pr = (PartitionedRegion)region; try { - retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false, allowReadFromHDFS); + retVal = pr.getRemotely((InternalDistributedMember)state.getTarget(), keyInfo.getBucketId(), key, callbackArgument, peferCD, requestingClient, clientEvent, false); } catch (TransactionException e) { RuntimeException re = getTransactionException(keyInfo, e); re.initCause(e.getCause()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java index 482882f..f2859f1 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tx/TXRegionStub.java @@ -42,8 +42,8 @@ public interface TXRegionStub { boolean containsValueForKey(KeyInfo keyInfo); Object findObject(KeyInfo keyInfo, boolean isCreate, - boolean generateCallbacks, Object value, boolean preferCD, - ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean allowReadFromHDFS); + boolean generateCallbacks, Object value, boolean preferCD, + ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent); Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstone); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java index 94524bd..fe09d03 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java @@ -157,8 +157,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, protected boolean isBucketSorted; - protected boolean isHDFSQueue; - protected boolean isMetaQueue; private int parallelismForReplicatedRegion; @@ -260,7 +258,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, this.maxMemoryPerDispatcherQueue = this.queueMemory / this.dispatcherThreads; this.myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager().getDistributedSystemId(); this.serialNumber = DistributionAdvisor.createSerialNumber(); - this.isHDFSQueue = attrs.isHDFSQueue(); this.isMetaQueue = attrs.isMetaQueue(); if (!(this.cache instanceof CacheCreation)) { this.stopper = new Stopper(cache.getCancelCriterion()); @@ -269,8 +266,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, this.statistics = new GatewaySenderStats(cache.getDistributedSystem(), id); } - if (!attrs.isHDFSQueue()) - initializeEventIdIndex(); + initializeEventIdIndex(); } this.isBucketSorted = attrs.isBucketSorted(); } @@ -318,12 +314,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, cache.getDistributedSystem(), AsyncEventQueueImpl .getAsyncEventQueueIdFromSenderId(id)); } - if (!attrs.isHDFSQueue()) - initializeEventIdIndex(); + initializeEventIdIndex(); } this.isBucketSorted = attrs.isBucketSorted(); - this.isHDFSQueue = attrs.isHDFSQueue(); - + } public GatewaySenderAdvisor getSenderAdvisor() { @@ -482,10 +476,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, return this.isBucketSorted; } - public boolean getIsHDFSQueue() { - return this.isHDFSQueue; - } - public boolean getIsMetaQueue() { return this.isMetaQueue; } @@ -863,12 +853,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, return; } - if (getIsHDFSQueue() && event.getOperation().isEviction()) { - if (logger.isDebugEnabled()) - logger.debug("Eviction event not queued: " + event); - stats.incEventsNotQueued(); - return; - } // this filter is defined by Asif which exist in old wan too. new wan has // other GatewaEventFilter. Do we need to get rid of this filter. Cheetah is // not cinsidering this filter http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java index 025616d..1cef940 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAttributes.java @@ -30,7 +30,6 @@ import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; public class GatewaySenderAttributes { public static final boolean DEFAULT_IS_BUCKETSORTED = true; - public static final boolean DEFAULT_IS_HDFSQUEUE = false; public static final boolean DEFAULT_IS_META_QUEUE = false; @@ -82,7 +81,6 @@ public class GatewaySenderAttributes { public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED; - public boolean isHDFSQueue = GatewaySenderAttributes.DEFAULT_IS_HDFSQUEUE; public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE; public int getSocketBufferSize() { @@ -191,9 +189,6 @@ public class GatewaySenderAttributes { public GatewayEventSubstitutionFilter getGatewayEventSubstitutionFilter() { return this.eventSubstitutionFilter; } - public boolean isHDFSQueue() { - return this.isHDFSQueue; - } public boolean isMetaQueue() { return this.isMetaQueue; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index b63c7cb..07a3be5 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -36,9 +36,6 @@ import com.gemstone.gemfire.InternalGemFireException; import com.gemstone.gemfire.cache.CacheException; import com.gemstone.gemfire.cache.EntryEvent; import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue; import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; import com.gemstone.gemfire.internal.cache.EntryEventImpl; import com.gemstone.gemfire.internal.cache.EnumListenerEvent; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java index 8524ccf..f995ba4 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java @@ -22,8 +22,6 @@ package com.gemstone.gemfire.internal.cache.wan.parallel; import com.gemstone.gemfire.cache.CacheException; import com.gemstone.gemfire.cache.CacheListener; import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; import com.gemstone.gemfire.internal.cache.Conflatable; import com.gemstone.gemfire.internal.cache.DistributedRegion; import com.gemstone.gemfire.internal.cache.ForceReattemptException; @@ -188,11 +186,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { getPGSProcessor( bucketId).notifyEventProcessorIfRequired(bucketId); } - public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region, - int bucketId) throws ForceReattemptException { - return getPGSProcessor(bucketId).getBucketRegionQueue(region, bucketId); - } - public void clear(PartitionedRegion pr, int bucketId) { getPGSProcessor(bucketId).clear(pr, bucketId); } @@ -207,11 +200,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue { getPGSProcessor(bucketId).conflateEvent(conflatableObject, bucketId, tailKey); } - public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, - int bucketId) throws ForceReattemptException { - return getPGSProcessor(bucketId).get(region, regionKey, bucketId); - } - public void addShadowPartitionedRegionForUserRR(DistributedRegion userRegion) { for(int i =0; i< processors.length; i++){ processors[i].addShadowPartitionedRegionForUserRR(userRegion);; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java index 417ba13..11502af 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java @@ -28,9 +28,6 @@ import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.cache.CacheException; import com.gemstone.gemfire.cache.EntryEvent; import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue; import com.gemstone.gemfire.cache.wan.GatewayQueueEvent; import com.gemstone.gemfire.internal.cache.Conflatable; import com.gemstone.gemfire.internal.cache.DistributedRegion; @@ -104,10 +101,7 @@ public class ParallelGatewaySenderEventProcessor extends } ParallelGatewaySenderQueue queue; - if (sender.getIsHDFSQueue()) - queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher); - else - queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher); + queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher); queue.start(); this.queue = queue; @@ -145,12 +139,8 @@ public class ParallelGatewaySenderEventProcessor extends // while merging 42004, kept substituteValue as it is(it is barry's // change 42466). bucketID is merged with eventID.getBucketID - if (!sender.getIsHDFSQueue()) gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID()); - else - gatewayQueueEvent = new HDFSGatewayEventImpl(operation, - event, substituteValue, true, eventID.getBucketID()); if (getSender().beforeEnqueue(gatewayQueueEvent)) { long start = getSender().getStatistics().startTime(); @@ -208,16 +198,6 @@ public class ParallelGatewaySenderEventProcessor extends ((ParallelGatewaySenderQueue)this.queue).conflateEvent(conflatableObject, bucketId, tailKey); } - public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, - int bucketId) throws ForceReattemptException { - return ((HDFSParallelGatewaySenderQueue)this.queue).get(region, regionKey, bucketId); - } - - public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region, - int bucketId) throws ForceReattemptException { - return ((HDFSParallelGatewaySenderQueue)this.queue).getBucketRegionQueue(region, bucketId); - } - public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) { // TODO Auto-generated method stub ((ParallelGatewaySenderQueue)this.queue).addShadowPartitionedRegionForUserPR(pr); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index b0b1a32..46ff263 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -492,7 +492,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { if (this.userRegionNameToshadowPRMap.containsKey(regionName)) return; - if(!isUsedForHDFS() && userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){ + if(userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()){ throw new GatewaySenderException( LocalizedStrings.ParallelGatewaySenderQueue_NON_PERSISTENT_GATEWAY_SENDER_0_CAN_NOT_BE_ATTACHED_TO_PERSISTENT_REGION_1 .toLocalizedString(new Object[] { this.sender.getId(), @@ -552,7 +552,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue { } ParallelGatewaySenderQueueMetaRegion meta = metaRegionFactory.newMetataRegion(cache, - prQName, ra, sender, isUsedForHDFS()); + prQName, ra, sender); try { prQ = (PartitionedRegion)cache @@ -630,10 +630,6 @@ public class ParallelGatewaySenderQueue implements RegionQueue { bucketRegion.clear(); } } - protected boolean isUsedForHDFS() - { - return false; - } protected void afterRegionAdd (PartitionedRegion userPR) { } @@ -1857,18 +1853,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue { public ParallelGatewaySenderQueueMetaRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, GemFireCacheImpl cache, AbstractGatewaySender pgSender) { - this( regionName, attrs, parentRegion, cache, pgSender, false); - } - public ParallelGatewaySenderQueueMetaRegion(String regionName, - RegionAttributes attrs, LocalRegion parentRegion, - GemFireCacheImpl cache, AbstractGatewaySender pgSender, boolean isUsedForHDFS) { super(regionName, attrs, parentRegion, cache, new InternalRegionArguments().setDestroyLockFlag(true) .setRecreateFlag(false).setSnapshotInputStream(null) .setImageTarget(null) .setIsUsedForParallelGatewaySenderQueue(true) - .setParallelGatewaySender((AbstractGatewaySender)pgSender) - .setIsUsedForHDFSParallelGatewaySenderQueue(isUsedForHDFS)); + .setParallelGatewaySender((AbstractGatewaySender)pgSender)); this.sender = (AbstractGatewaySender)pgSender; } @@ -1925,9 +1915,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue { static class MetaRegionFactory { ParallelGatewaySenderQueueMetaRegion newMetataRegion( - GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender, boolean isUsedForHDFS) { + GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender sender) { ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion( - prQName, ra, null, cache, sender, isUsedForHDFS); + prQName, ra, null, cache, sender); return meta; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java index 77f9596..0015665 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/AsyncEventQueueCreation.java @@ -41,7 +41,6 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { private int maxQueueMemory = 0; private boolean isParallel = false; private boolean isBucketSorted = false; - private boolean isHDFSQueue = false; private int dispatcherThreads = 1; private OrderPolicy orderPolicy = OrderPolicy.KEY; @@ -62,7 +61,6 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { this.orderPolicy = senderAttrs.policy; this.asyncEventListener = eventListener; this.isBucketSorted = senderAttrs.isBucketSorted; - this.isHDFSQueue = senderAttrs.isHDFSQueue; this.gatewayEventSubstitutionFilter = senderAttrs.eventSubstitutionFilter; } @@ -213,11 +211,4 @@ public class AsyncEventQueueCreation implements AsyncEventQueue { public void setBucketSorted(boolean isBucketSorted) { this.isBucketSorted = isBucketSorted; } - public boolean isHDFSQueue() { - return this.isHDFSQueue; - } - - public void setIsHDFSQueue(boolean isHDFSQueue) { - this.isHDFSQueue = isHDFSQueue; - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java index 915bde9..d52d05e 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java @@ -91,11 +91,6 @@ import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.gemfire.i18n.LogWriterI18n; import com.gemstone.gemfire.internal.Assert; -import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl; -import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl; import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.CacheConfig; import com.gemstone.gemfire.internal.cache.CacheServerLauncher; @@ -198,8 +193,7 @@ public class CacheCreation implements InternalCache { * This is important for unit testing 44914. */ protected final Map diskStores = new LinkedHashMap(); - protected final Map hdfsStores = new LinkedHashMap(); - + private final List<File> backups = new ArrayList<File>(); private CacheConfig cacheConfig = new CacheConfig(); @@ -513,13 +507,6 @@ public class CacheCreation implements InternalCache { } } - for(Iterator iter = this.hdfsStores.entrySet().iterator(); iter.hasNext(); ) { - Entry entry = (Entry) iter.next(); - HDFSStoreCreation hdfsStoreCreation = (HDFSStoreCreation) entry.getValue(); - HDFSStoreFactory storefactory = cache.createHDFSStoreFactory(hdfsStoreCreation); - storefactory.create((String) entry.getKey()); - } - cache.initializePdxRegistry(); @@ -530,19 +517,6 @@ public class CacheCreation implements InternalCache { (RegionAttributesCreation) getRegionAttributes(id); creation.inheritAttributes(cache, false); - // TODO: HDFS: HDFS store/queue will be mapped against region path and not - // the attribute id; don't really understand what this is trying to do - if (creation.getHDFSStoreName() != null) - { - HDFSStoreImpl store = cache.findHDFSStore(creation.getHDFSStoreName()); - if(store == null) { - HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS((Cache)cache, creation.getHDFSWriteOnly(), id); - } - } - if (creation.getHDFSStoreName() != null && creation.getPartitionAttributes().getColocatedWith() == null) { - creation.addAsyncEventQueueId(HDFSStoreFactoryImpl.getEventQueueName(id)); - } - RegionAttributes attrs; // Don't let the RegionAttributesCreation escape to the user AttributesFactory factory = new AttributesFactory(creation); @@ -1421,17 +1395,6 @@ public class CacheCreation implements InternalCache { } @Override - public Collection<HDFSStoreImpl> getHDFSStores() { - return this.hdfsStores.values(); - } - - public void addHDFSStore(String name, HDFSStoreCreation hs) { - this.hdfsStores.put(name, hs); - } - - - - @Override public DistributedMember getMyId() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java index c6b0509..aa7d49a 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXml.java @@ -487,8 +487,6 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler { protected static final String PERSISTENT_REPLICATE_DP = "persistent-replicate"; protected static final String PARTITION_DP = "partition"; protected static final String PERSISTENT_PARTITION_DP = "persistent-partition"; - protected static final String HDFS_PARTITION_DP = "hdfs-partition"; - protected static final String HDFS_PERSISTENT_PARTITION_DP = "hdfs-persistent-partition"; /** The name of the <code>keep-alive-timeout</code> attribute */ protected static final String KEEP_ALIVE_TIMEOUT = "keep-alive-timeout"; @@ -765,35 +763,6 @@ public abstract class CacheXml implements EntityResolver2, ErrorHandler { public static final String ASYNC_EVENT_QUEUE = "async-event-queue"; protected static final String ASYNC_EVENT_QUEUE_IDS = "async-event-queue-ids"; - protected static final String HDFS_EVENT_QUEUE = "hdfs-event-queue"; - protected static final String HDFS_STORE_NAME = "hdfs-store-name"; - public static final String HDFS_STORE = "hdfs-store"; - protected static final String HDFS_HOME_DIR = "home-dir"; - protected static final String HDFS_READ_CACHE_SIZE = "read-cache-size"; - protected static final String HDFS_MAX_MEMORY = "max-memory"; - protected static final String HDFS_BATCH_SIZE = "batch-size"; - protected static final String HDFS_BATCH_INTERVAL = "batch-interval"; - protected static final String HDFS_DISPATCHER_THREADS = "dispatcher-threads"; - protected static final String HDFS_BUFFER_PERSISTENT = "buffer-persistent"; - protected static final String HDFS_SYNCHRONOUS_DISK_WRITE = "synchronous-disk-write"; - protected static final String HDFS_DISK_STORE = "disk-store"; - protected static final String HDFS_MAX_WRITE_ONLY_FILE_SIZE = "max-write-only-file-size"; - public static final String HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL = "write-only-file-rollover-interval"; - - protected static final String HDFS_NAMENODE_URL = "namenode-url"; - protected static final String HDFS_CLIENT_CONFIG_FILE = "hdfs-client-config-file"; - public static final String HDFS_PURGE_INTERVAL = "purge-interval"; - public static final String HDFS_MAJOR_COMPACTION = "major-compaction"; - public static final String HDFS_MAJOR_COMPACTION_INTERVAL = "major-compaction-interval"; - public static final String HDFS_MAJOR_COMPACTION_THREADS = "major-compaction-threads"; - public static final String HDFS_MINOR_COMPACTION = "minor-compaction"; - public static final String HDFS_MINOR_COMPACTION_THREADS = "minor-compaction-threads"; - - public static final String HDFS_TIME_FOR_FILE_ROLLOVER = "file-rollover-time-secs"; - - protected static final String HDFS_WRITE_ONLY = "hdfs-write-only"; - protected static final String HDFS_QUEUE_BATCH_SIZE = "batch-size-mb"; - /** The name of the <code>compressor</code> attribute */ protected static final String COMPRESSOR = "compressor"; /** The name of the <code>off-heap</code> attribute