DL-32: Fix Findbug warnings - Bump the version to 3.0.3 - Fix all the findbug warnings - Enable findbugs:check on travis ci
Author: Jon Derrick <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #19 from jderrickk/jd/fix_findbugs_error and squashes the following commits: c48c89c [Jon Derrick] Merge branch 'master' into jd/fix_findbugs_error d9b0425 [Jon Derrick] verify findbugs on travis ci 985501b [Jon Derrick] Fix findbug errors on all modules 18e8267 [Jon Derrick] Remove distributedlog-example ffa8361 [Jon Derrick] Fix findbugs in distributedlog-protocol Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/bb6990de Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/bb6990de Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/bb6990de Branch: refs/heads/master Commit: bb6990dee74cdcf158f494041f187f856344f4cf Parents: 93bdad0 Author: Jon Derrick <[email protected]> Authored: Thu Oct 13 00:47:11 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Thu Oct 13 00:47:11 2016 -0700 ---------------------------------------------------------------------- .travis.yml | 2 +- .../routing/ConsistentHashRoutingService.java | 13 +- .../distributedlog/BKAbstractLogWriter.java | 6 +- .../distributedlog/BKAsyncLogReaderDLSN.java | 15 +- .../distributedlog/BKAsyncLogWriter.java | 2 +- .../distributedlog/BKDistributedLogManager.java | 7 +- .../distributedlog/BKLogWriteHandler.java | 3 +- .../distributedlog/BKSyncLogReaderDLSN.java | 2 +- .../distributedlog/DistributedLogConstants.java | 2 +- .../distributedlog/LogSegmentMetadata.java | 102 ++++++------- .../com/twitter/distributedlog/MaxTxId.java | 2 +- .../distributedlog/function/VoidFunctions.java | 2 +- .../impl/ZKLogSegmentMetadataStore.java | 2 +- .../federated/FederatedZKLogMetadataStore.java | 24 ++- .../distributedlog/limiter/RateLimiter.java | 5 - .../rate/MovingAverageRateFactory.java | 4 +- .../readahead/ReadAheadWorker.java | 2 - .../tools/DistributedLogTool.java | 18 +-- .../twitter/distributedlog/util/DLUtils.java | 6 +- .../src/main/resources/findbugsExclude.xml | 7 +- distributedlog-example/bin/bk-cluster | 26 ---- distributedlog-example/bin/proxy-cluster | 33 ----- distributedlog-example/bin/proxy-writer | 35 ----- distributedlog-example/conf/bk_server.conf | 145 ------------------- distributedlog-example/conf/distributedlog.conf | 125 ---------------- distributedlog-example/conf/log4j.properties | 44 ------ .../conf/stream_config/example-stream_0.conf | 23 --- distributedlog-example/pom.xml | 136 ----------------- .../example/DistributedLogExample.java | 91 ------------ .../example/ProxyClusterEmulator.java | 82 ----------- .../example/TestProxyClusterEmulator.java | 61 -------- .../distributedlog/util/ProtocolUtils.java | 14 +- .../src/main/resources/findbugsExclude.xml | 17 +++ .../service/stream/BulkWriteOp.java | 1 + .../service/stream/StreamManagerImpl.java | 7 +- .../distributedlog/service/tools/ProxyTool.java | 1 - .../src/main/resources/findbugsExclude.xml | 6 + .../distributedlog-basic/pom.xml | 7 - .../basic/ConsoleProxyMultiWriter.java | 3 - .../basic/ConsoleProxyWriter.java | 3 - .../distributedlog/basic/ConsoleWriter.java | 3 - .../distributedlog-kafka/pom.xml | 7 - .../kafka/DLFutureRecordMetadata.java | 4 +- .../mapreduce/LogSegmentReader.java | 8 +- .../ConsoleProxyPartitionedMultiWriter.java | 2 - .../messaging/ConsoleProxyRRMultiWriter.java | 2 - .../src/main/resources/findbugsExclude.xml | 33 +++++ pom.xml | 3 +- 48 files changed, 184 insertions(+), 964 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index e5ee4db..fc464b3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,7 +28,7 @@ install: script: - travis_retry mvn clean apache-rat:check - - travis_wait 60 mvn package + - travis_wait 60 mvn package findbugs:check cache: directories: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java index acf71ae..8abd299 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java @@ -439,7 +439,8 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService { MapDifference<Integer, SocketAddress> difference = Maps.difference(shardId2Address, newMap); left = difference.entriesOnlyOnLeft(); - for (Integer shard : left.keySet()) { + for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) { + int shard = shardEntry.getKey(); if (shard >= 0) { SocketAddress host = shardId2Address.get(shard); if (null != host) { @@ -452,7 +453,7 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService { } else { // shard id is negative - they are resolved from finagle name, which instances don't have shard id // in this case, if they are removed from serverset, we removed them directly - SocketAddress host = left.get(shard); + SocketAddress host = shardEntry.getValue(); if (null != host) { removeHostInternal(host, Optional.<Throwable>absent()); removedList.add(host); @@ -460,11 +461,11 @@ public class ConsistentHashRoutingService extends ServerSetRoutingService { } } // we need to find if any shards are replacing old shards - for (Integer shard : newMap.keySet()) { - SocketAddress oldHost = shardId2Address.get(shard); - SocketAddress newHost = newMap.get(shard); + for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) { + SocketAddress oldHost = shardId2Address.get(shard.getKey()); + SocketAddress newHost = shard.getValue(); if (!newHost.equals(oldHost)) { - join(shard, newHost, removedList); + join(shard.getKey(), newHost, removedList); joinedList.add(newHost); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java index 83167ab..b0cea24 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java @@ -356,8 +356,10 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab // skip scheduling if there is task that's already running // - if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) { - lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); + synchronized (this) { + if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) { + lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); + } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java index ef055a0..7d3d53d 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java @@ -90,6 +90,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); private final ScheduledExecutorService executorService; private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); + private final Object scheduleLock = new Object(); private final AtomicLong scheduleCount = new AtomicLong(0); final private Stopwatch scheduleDelayStopwatch; final private Stopwatch readNextDelayStopwatch; @@ -112,7 +113,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { @Override public void run() { - synchronized (scheduleCount) { + synchronized (scheduleLock) { backgroundScheduleTask = null; } scheduleBackgroundRead(); @@ -485,7 +486,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName()); } - synchronized (scheduleCount) { + synchronized (scheduleLock) { if (null != backgroundScheduleTask) { backgroundScheduleTask.cancel(true); } @@ -508,7 +509,7 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti @Override public void run() { - synchronized(scheduleCount) { + synchronized(scheduleLock) { if (scheduleDelayStopwatch.isRunning()) { scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); } @@ -533,11 +534,11 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); return; } - } - if (disableProcessingReadRequests) { - LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName()); - return; + if (disableProcessingReadRequests) { + LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName()); + return; + } } // If the oldest pending promise is interrupted then we must mark http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java index ffa478a..f1594f9 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java @@ -458,7 +458,7 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri } } if (null == writerFuture) { - return Future.value(lastTxId); + return Future.value(getLastTxId()); } return writerFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index a5be03c..6a9d860 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -486,10 +486,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL } public void checkClosedOrInError(String operation) throws AlreadyClosedException { - if (null != closePromise) { - throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager"); + synchronized (this) { + if (null != closePromise) { + throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager"); + } } - if (null != writerBKC) { writerBKC.checkClosedOrInError(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java index d73c5e2..573679a 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java @@ -108,7 +108,6 @@ class BKLogWriteHandler extends BKLogHandler { protected final boolean sanityCheckTxnId; protected final boolean validateLogSegmentSequenceNumber; protected final int regionId; - protected volatile boolean closed = false; protected final RollingPolicy rollingPolicy; protected Future<? extends DistributedLock> lockFuture = null; protected final PermitLimiter writeLimiter; @@ -225,7 +224,7 @@ class BKLogWriteHandler extends BKLogHandler { // Rolling Policy if (conf.getLogSegmentRollingIntervalMinutes() > 0) { - rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000); + rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L); } else { rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes()); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java index bd60856..cef5ddb 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java @@ -101,7 +101,7 @@ class BKSyncLogReaderDLSN implements LogReader, Runnable, FutureEventListener<Lo } } - private void setLastSeenDLSN(DLSN dlsn) { + private synchronized void setLastSeenDLSN(DLSN dlsn) { synchronized (sharedLock) { this.lastSeenDLSN = dlsn; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java index 8a6d824..5c50282 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java @@ -57,7 +57,7 @@ public class DistributedLogConstants { public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress"; public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs"; public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement"; - public static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); + static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8); // An ACL that gives all permissions to node creators and read permissions only to everyone else. public static final List<ACL> EVERYONE_READ_CREATOR_ALL = http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java index 7fe9942..994b141 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogSegmentMetadata.java @@ -622,7 +622,7 @@ public class LogSegmentMetadata { static LogSegmentMetadata parseDataV1(String path, byte[] data, String[] parts) throws IOException { - long versionStatusCount = Long.valueOf(parts[0]); + long versionStatusCount = Long.parseLong(parts[0]); long version = versionStatusCount & METADATA_VERSION_MASK; assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); @@ -637,8 +637,8 @@ public class LogSegmentMetadata { assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); if (parts.length == 3) { - long ledgerId = Long.valueOf(parts[1]); - long txId = Long.valueOf(parts[2]); + long ledgerId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); return new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId) .setRegionId(regionId) .setStatus(status) @@ -647,10 +647,10 @@ public class LogSegmentMetadata { long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - long ledgerId = Long.valueOf(parts[1]); - long firstTxId = Long.valueOf(parts[2]); - long lastTxId = Long.valueOf(parts[3]); - long completionTime = Long.valueOf(parts[4]); + long ledgerId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); return new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId) .setInprogress(false) .setLastTxId(lastTxId) @@ -667,7 +667,7 @@ public class LogSegmentMetadata { static LogSegmentMetadata parseDataV2(String path, byte[] data, String[] parts) throws IOException { - long versionStatusCount = Long.valueOf(parts[0]); + long versionStatusCount = Long.parseLong(parts[0]); long version = versionStatusCount & METADATA_VERSION_MASK; assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); @@ -682,9 +682,9 @@ public class LogSegmentMetadata { assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); if (parts.length == 4) { - long ledgerId = Long.valueOf(parts[1]); - long txId = Long.valueOf(parts[2]); - long logSegmentSequenceNumber = Long.valueOf(parts[3]); + long ledgerId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + long logSegmentSequenceNumber = Long.parseLong(parts[3]); return new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId) .setLogSegmentSequenceNo(logSegmentSequenceNumber) .setRegionId(regionId) @@ -694,13 +694,13 @@ public class LogSegmentMetadata { long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - long ledgerId = Long.valueOf(parts[1]); - long firstTxId = Long.valueOf(parts[2]); - long lastTxId = Long.valueOf(parts[3]); - long completionTime = Long.valueOf(parts[4]); - long logSegmentSequenceNumber = Long.valueOf(parts[5]); - long lastEntryId = Long.valueOf(parts[6]); - long lastSlotId = Long.valueOf(parts[7]); + long ledgerId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + long logSegmentSequenceNumber = Long.parseLong(parts[5]); + long lastEntryId = Long.parseLong(parts[6]); + long lastSlotId = Long.parseLong(parts[7]); return new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId) .setInprogress(false) .setLastTxId(lastTxId) @@ -721,7 +721,7 @@ public class LogSegmentMetadata { static LogSegmentMetadata parseDataVersionsWithMinActiveDLSN(String path, byte[] data, String[] parts) throws IOException { - long versionStatusCount = Long.valueOf(parts[0]); + long versionStatusCount = Long.parseLong(parts[0]); long version = versionStatusCount & METADATA_VERSION_MASK; assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); @@ -737,11 +737,11 @@ public class LogSegmentMetadata { assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); if (parts.length == 6) { - long ledgerId = Long.valueOf(parts[1]); - long txId = Long.valueOf(parts[2]); - long logSegmentSequenceNumber = Long.valueOf(parts[3]); - long minActiveEntryId = Long.valueOf(parts[4]); - long minActiveSlotId = Long.valueOf(parts[5]); + long ledgerId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + long logSegmentSequenceNumber = Long.parseLong(parts[3]); + long minActiveEntryId = Long.parseLong(parts[4]); + long minActiveSlotId = Long.parseLong(parts[5]); LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId) .setLogSegmentSequenceNo(logSegmentSequenceNumber) @@ -757,15 +757,15 @@ public class LogSegmentMetadata { long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - long ledgerId = Long.valueOf(parts[1]); - long firstTxId = Long.valueOf(parts[2]); - long lastTxId = Long.valueOf(parts[3]); - long completionTime = Long.valueOf(parts[4]); - long logSegmentSequenceNumber = Long.valueOf(parts[5]); - long lastEntryId = Long.valueOf(parts[6]); - long lastSlotId = Long.valueOf(parts[7]); - long minActiveEntryId = Long.valueOf(parts[8]); - long minActiveSlotId = Long.valueOf(parts[9]); + long ledgerId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + long logSegmentSequenceNumber = Long.parseLong(parts[5]); + long lastEntryId = Long.parseLong(parts[6]); + long lastSlotId = Long.parseLong(parts[7]); + long minActiveEntryId = Long.parseLong(parts[8]); + long minActiveSlotId = Long.parseLong(parts[9]); LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId) .setInprogress(false) .setLastTxId(lastTxId) @@ -791,7 +791,7 @@ public class LogSegmentMetadata { static LogSegmentMetadata parseDataVersionsWithSequenceId(String path, byte[] data, String[] parts) throws IOException { - long versionStatusCount = Long.valueOf(parts[0]); + long versionStatusCount = Long.parseLong(parts[0]); long version = versionStatusCount & METADATA_VERSION_MASK; assert (version >= Integer.MIN_VALUE && version <= Integer.MAX_VALUE); @@ -807,12 +807,12 @@ public class LogSegmentMetadata { assert (status >= 0 && status <= METADATA_STATUS_BIT_MAX); if (parts.length == 7) { - long ledgerId = Long.valueOf(parts[1]); - long txId = Long.valueOf(parts[2]); - long logSegmentSequenceNumber = Long.valueOf(parts[3]); - long minActiveEntryId = Long.valueOf(parts[4]); - long minActiveSlotId = Long.valueOf(parts[5]); - long startSequenceId = Long.valueOf(parts[6]); + long ledgerId = Long.parseLong(parts[1]); + long txId = Long.parseLong(parts[2]); + long logSegmentSequenceNumber = Long.parseLong(parts[3]); + long minActiveEntryId = Long.parseLong(parts[4]); + long minActiveSlotId = Long.parseLong(parts[5]); + long startSequenceId = Long.parseLong(parts[6]); LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, txId) .setLogSegmentSequenceNo(logSegmentSequenceNumber) @@ -827,16 +827,16 @@ public class LogSegmentMetadata { long recordCount = (versionStatusCount & LOGRECORD_COUNT_MASK) >> LOGRECORD_COUNT_SHIFT; assert (recordCount >= Integer.MIN_VALUE && recordCount <= Integer.MAX_VALUE); - long ledgerId = Long.valueOf(parts[1]); - long firstTxId = Long.valueOf(parts[2]); - long lastTxId = Long.valueOf(parts[3]); - long completionTime = Long.valueOf(parts[4]); - long logSegmentSequenceNumber = Long.valueOf(parts[5]); - long lastEntryId = Long.valueOf(parts[6]); - long lastSlotId = Long.valueOf(parts[7]); - long minActiveEntryId = Long.valueOf(parts[8]); - long minActiveSlotId = Long.valueOf(parts[9]); - long startSequenceId = Long.valueOf(parts[10]); + long ledgerId = Long.parseLong(parts[1]); + long firstTxId = Long.parseLong(parts[2]); + long lastTxId = Long.parseLong(parts[3]); + long completionTime = Long.parseLong(parts[4]); + long logSegmentSequenceNumber = Long.parseLong(parts[5]); + long lastEntryId = Long.parseLong(parts[6]); + long lastSlotId = Long.parseLong(parts[7]); + long minActiveEntryId = Long.parseLong(parts[8]); + long minActiveSlotId = Long.parseLong(parts[9]); + long startSequenceId = Long.parseLong(parts[10]); LogSegmentMetadataBuilder builder = new LogSegmentMetadataBuilder(path, llmv, ledgerId, firstTxId) .setInprogress(false) .setLastTxId(lastTxId) @@ -867,7 +867,7 @@ public class LogSegmentMetadata { String[] parts = new String(data, UTF_8).split(";"); long version; try { - version = Long.valueOf(parts[0]) & METADATA_VERSION_MASK; + version = Long.parseLong(parts[0]) & METADATA_VERSION_MASK; } catch (Exception exc) { throw new IOException("Invalid ledger entry, " + new String(data, UTF_8)); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java index ea301e2..c446a8b 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java @@ -89,7 +89,7 @@ class MaxTxId { } String txidStr = Long.toString(maxTxId); try { - Stat stat = zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1); + zkc.get().setData(path, txidStr.getBytes("UTF-8"), -1); currentMax = maxTxId; } catch (Exception e) { LOG.error("Error writing new MaxTxId value {}", maxTxId, e); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java index 316a53f..e260482 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java @@ -23,7 +23,7 @@ import java.util.List; public class VoidFunctions { - public static AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC = + public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC = new AbstractFunction1<List<Void>, Void>() { @Override public Void apply(List<Void> list) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java index 41b887e..c0796a1 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -326,7 +326,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch listenerSet.add(listener); if (!listeners.containsKey(logSegmentsPath)) { // listener set has been removed, add it back - listeners.putIfAbsent(logSegmentsPath, listenerSet); + listeners.put(logSegmentsPath, listenerSet); } } new ReadLogSegmentsTask(logSegmentsPath, this).run(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java index 9b5bea8..0a8f28b 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java @@ -206,9 +206,10 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this)); for (URI uri : uris) { SubNamespace subNs = new SubNamespace(uri); - subNamespaces.putIfAbsent(uri, subNs); - subNs.watch(); - logger.info("Watched sub namespace {}", uri); + if (null == subNamespaces.putIfAbsent(uri, subNs)) { + subNs.watch(); + logger.info("Watched sub namespace {}", uri); + } } logger.info("Federated ZK LogMetadataStore is initialized for {}", namespace); @@ -598,10 +599,19 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } void setZkSubnamespacesVersion(int zkVersion) { - synchronized (zkSubnamespacesVersion) { - Integer oldVersion = zkSubnamespacesVersion.get(); - if (null == oldVersion || oldVersion < zkVersion) { - zkSubnamespacesVersion.set(zkVersion); + Integer oldVersion; + boolean done = false; + while (!done) { + oldVersion = zkSubnamespacesVersion.get(); + if (null == oldVersion) { + done = zkSubnamespacesVersion.compareAndSet(null, zkVersion); + continue; + } + if (oldVersion < zkVersion) { + done = zkSubnamespacesVersion.compareAndSet(oldVersion, zkVersion); + continue; + } else { + done = true; } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java index 3a266d9..0cb1ebe 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java @@ -37,11 +37,6 @@ public interface RateLimiter { }; public static abstract class Builder { - protected int limit; - public Builder setLimit(int limit) { - this.limit = limit; - return this; - } public abstract RateLimiter build(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java index 1a04b4f..2f9869e 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/rate/MovingAverageRateFactory.java @@ -26,10 +26,12 @@ import java.util.concurrent.CopyOnWriteArrayList; import scala.runtime.BoxedUnit; public class MovingAverageRateFactory { + + private static final int DEFAULT_INTERVAL_SECS = 1; + private final Timer timer; private final TimerTask timerTask; private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs; - private final int DEFAULT_INTERVAL_SECS = 1; public MovingAverageRateFactory(Timer timer) { this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java index c912178..a3fd239 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java @@ -1275,8 +1275,6 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As submit(new Runnable() { @Override public void run() { - long numEntries = endEntryId - startEntryId + 1; - // If readAheadSkipBrokenEntries is enabled and we hit a corrupt entry, log and // stat the issue and move forward. if (BKException.Code.DigestMatchException == rc && readAheadSkipBrokenEntries) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java index 91724c7..bcb7853 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java @@ -395,12 +395,8 @@ public class DistributedLogTool extends Tool { LedgerAllocatorUtils.createLedgerAllocatorPool(poolPath, 0, getConf(), getZooKeeperClient(), getBookKeeperClient(), allocationExecutor); - if (null == allocator) { - System.err.println("ERROR: use zk34 version to delete allocator pool : " + poolPath + " ."); - } else { - allocator.delete(); - System.out.println("Deleted allocator pool : " + poolPath + " ."); - } + allocator.delete(); + System.out.println("Deleted allocator pool : " + poolPath + " ."); } catch (IOException ioe) { System.err.println("Failed to delete allocator pool " + poolPath + " : " + ioe.getMessage()); } @@ -538,9 +534,7 @@ public class DistributedLogTool extends Tool { } for (Map.Entry<String, List<Pair<LogSegmentMetadata, List<String>>>> entry : corruptedCandidates.entrySet()) { System.out.println(entry.getKey() + " : \n"); - List<LogSegmentMetadata> segments = new ArrayList<LogSegmentMetadata>(entry.getValue().size()); for (Pair<LogSegmentMetadata, List<String>> pair : entry.getValue()) { - segments.add(pair.getLeft()); System.out.println("\t - " + pair.getLeft()); if (printInprogressOnly && dumpEntries) { int i = 0; @@ -2592,10 +2586,6 @@ public class DistributedLogTool extends Tool { options.addOption("b64", "base64", true, "Base64 encoded dlsn"); } - public void setBase64DLSN(String base64Dlsn) { - base64Dlsn = base64Dlsn; - } - protected void parseCommandLine(CommandLine cmdline) throws ParseException { if (cmdline.hasOption("b64")) { base64Dlsn = cmdline.getOptionValue("b64"); @@ -2622,10 +2612,6 @@ public class DistributedLogTool extends Tool { options.addOption("x", "hex", false, "Emit hex-encoded string DLSN instead of base 64"); } - public void setDLSN(DLSN dlsn) { - dlsn = dlsn; - } - protected void parseCommandLine(CommandLine cmdline) throws ParseException { if (cmdline.hasOption("dlsn")) { dlsn = parseDLSN(cmdline.getOptionValue("dlsn")); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java index 42c624a..803db90 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java @@ -165,7 +165,7 @@ public class DLUtils { */ public static long deserializeLogSegmentSequenceNumber(byte[] data) { String seqNoStr = new String(data, UTF_8); - return Long.valueOf(seqNoStr); + return Long.parseLong(seqNoStr); } /** @@ -189,7 +189,7 @@ public class DLUtils { */ public static long deserializeTransactionId(byte[] data) { String seqNoStr = new String(data, UTF_8); - return Long.valueOf(seqNoStr); + return Long.parseLong(seqNoStr); } /** @@ -222,6 +222,6 @@ public class DLUtils { * @return ledger id */ public static long bytes2LedgerId(byte[] data) { - return Long.valueOf(new String(data, UTF_8)); + return Long.parseLong(new String(data, UTF_8)); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-core/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml b/distributedlog-core/src/main/resources/findbugsExclude.xml index 16d9c6c..684b827 100644 --- a/distributedlog-core/src/main/resources/findbugsExclude.xml +++ b/distributedlog-core/src/main/resources/findbugsExclude.xml @@ -22,13 +22,14 @@ </Match> <Match> <!-- it is safe to store external bytes reference here. //--> - <Class name="com.twitter.distributedlog.LogRecord" /> + <Class name="com.twitter.distributedlog.Entry$Builder" /> + <Method name="setData" /> <Bug pattern="EI_EXPOSE_REP2" /> </Match> <Match> <!-- it is safe to store external bytes reference here. //--> - <Class name="com.twitter.distributedlog.LogRecord" /> - <Method name="getPayload" /> + <Class name="com.twitter.distributedlog.Entry" /> + <Method name="getRawData" /> <Bug pattern="EI_EXPOSE_REP" /> </Match> <Match> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/bin/bk-cluster ---------------------------------------------------------------------- diff --git a/distributedlog-example/bin/bk-cluster b/distributedlog-example/bin/bk-cluster deleted file mode 100755 index 827153e..0000000 --- a/distributedlog-example/bin/bk-cluster +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -# -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -BASEDIR=$(dirname "$0") -DISTRIBUTEDLOG_ROOT="${BASEDIR}/../.." - -cd ${DISTRIBUTEDLOG_ROOT} &&\ -mvn clean install -pl distributedlog-core -am -DskipTests &&\ -distributedlog-core/bin/dlog local 7000 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/bin/proxy-cluster ---------------------------------------------------------------------- diff --git a/distributedlog-example/bin/proxy-cluster b/distributedlog-example/bin/proxy-cluster deleted file mode 100755 index cc16944..0000000 --- a/distributedlog-example/bin/proxy-cluster +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash -# -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -BASEDIR=$(dirname "$0") -DISTRIBUTEDLOG_ROOT="${BASEDIR}/../.." - -cd ${DISTRIBUTEDLOG_ROOT} &&\ -mvn clean install -Ptwitter-ostrich-provider -pl distributedlog-example -am -DskipTests &&\ -BUILT_JAR=`ls distributedlog-example/target/distributedlog-*.jar 2> /dev/null | egrep -v 'tests|javadoc|sources' | tail -1` && -java -cp .:distributedlog-example/lib/*:$BUILT_JAR \ - -Dlog4j.configuration=distributedlog-example/conf/log4j.properties \ - -DstatsHttpPort=9000 -DstatsExport=true \ - -Dserver_shard=0 \ - com.twitter.distributedlog.example.ProxyClusterEmulator \ - --port 8000 \ - --thriftmux http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/bin/proxy-writer ---------------------------------------------------------------------- diff --git a/distributedlog-example/bin/proxy-writer b/distributedlog-example/bin/proxy-writer deleted file mode 100755 index 5723cb0..0000000 --- a/distributedlog-example/bin/proxy-writer +++ /dev/null @@ -1,35 +0,0 @@ -#!/bin/bash -# -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -BASEDIR=$(dirname "$0") -DISTRIBUTEDLOG_ROOT="${BASEDIR}/../.." - -cd ${DISTRIBUTEDLOG_ROOT} &&\ -mvn clean install -P twitter-ostrich-provider -pl distributedlog-benchmark -am -DskipTests &&\ -BUILT_JAR=`ls distributedlog-benchmark/target/distributedlog-*.jar 2> /dev/null | egrep -v 'tests|javadoc|sources' | tail -1` && -java -cp .:distributedlog-benchmark/lib/*:$BUILT_JAR \ - -Dlog4j.configuration=distributedlog-example/conf/log4j.properties \ - -DstatsHttpPort=9001 -DstatsExport=true \ - com.twitter.distributedlog.benchmark.Benchmarker \ - --mode write --shard 0 --finagle-name 'inet!127.0.0.1:8000' \ - --duration 100000 --rate 10 --concurrency 1 \ - --start-stream-id 0 --end-stream-id 1 \ - --streamprefix example-stream \ - --provider org.apache.bookkeeper.stats.twitter.ostrich.OstrichProvider http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/bk_server.conf ---------------------------------------------------------------------- diff --git a/distributedlog-example/conf/bk_server.conf b/distributedlog-example/conf/bk_server.conf deleted file mode 100644 index 3e2f44b..0000000 --- a/distributedlog-example/conf/bk_server.conf +++ /dev/null @@ -1,145 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -## Bookie settings - -# Max file size of entry logger, in bytes -# A new entry log file will be created when the old one reaches the file size limitation -logSizeLimit=1073741823 - -# Max file size of journal file, in mega bytes -# A new journal file will be created when the old one reaches the file size limitation -# -journalMaxSizeMB=2048 - -# Max number of old journal file to kept -# Keep a number of old journal files would help data recovery in specia case -# -journalMaxBackups=5 - -# How long the interval to trigger next garbage collection, in milliseconds -# Since garbage collection is running in background, too frequent gc -# will heart performance. It is better to give a higher number of gc -# interval if there is enough disk capacity. -# -# gc per 20 minutes (even there is nothing to gc, it would scan entry log files -# to get ledgers mapping for next gc cycle. this would help if we have pretty high -# write volume) -gcWaitTime=1200000 -# do minor compaction per 1 hours -minorCompactionInterval=3600 -minorCompactionThreshold=0.2 -# disable major compaction -majorCompactionInterval=0 -# reduce major compaction threshold to a low value to prevent bad force compaction behavior -majorCompactionThreshold=0.3 -# disk usage -diskUsageThreshold=0.97 -# increase warn threshold to avoid bad force compaction behavior -diskUsageWarnThreshold=0.96 - -# How long the interval to flush ledger index pages to disk, in milliseconds -# Flushing index files will introduce much random disk I/O. -# If separating journal dir and ledger dirs each on different devices, -# flushing would not affect performance. But if putting journal dir -# and ledger dirs on same device, performance degrade significantly -# on too frequent flushing. You can consider increment flush interval -# to get better performance, but you need to pay more time on bookie -# server restart after failure. -# -flushInterval=1000 - -# ZooKeeper client session timeout in milliseconds -# Bookie server will exit if it received SESSION_EXPIRED because it -# was partitioned off from ZooKeeper for more than the session timeout -# JVM garbage collection, disk I/O will cause SESSION_EXPIRED. -# Increment this value could help avoiding this issue -zkTimeout=60000 - -## NIO Server settings - -# This settings is used to enabled/disabled Nagle's algorithm, which is a means of -# improving the efficiency of TCP/IP networks by reducing the number of packets -# that need to be sent over the network. -# If you are sending many small messages, such that more than one can fit in -# a single IP packet, setting server.tcpnodelay to false to enable Nagle algorithm -# can provide better performance. -# Default value is true. -# -serverTcpNoDelay=true - -## ledger cache settings - -# Max number of ledger index files could be opened in bookie server -# If number of ledger index files reaches this limitation, bookie -# server started to swap some ledgers from memory to disk. -# Too frequent swap will affect performance. You can tune this number -# to gain performance according your requirements. -openFileLimit=20000 - -# Size of a index page in ledger cache, in bytes -# A larger index page can improve performance writing page to disk, -# which is efficent when you have small number of ledgers and these -# ledgers have similar number of entries. -# If you have large number of ledgers and each ledger has fewer entries, -# smaller index page would improve memory usage. -pageSize=8192 - -# How many index pages provided in ledger cache -# If number of index pages reaches this limitation, bookie server -# starts to swap some ledgers from memory to disk. You can increment -# this value when you found swap became more frequent. But make sure -# pageLimit*pageSize should not more than JVM max memory limitation, -# otherwise you would got OutOfMemoryException. -# In general, incrementing pageLimit, using smaller index page would -# gain bettern performance in lager number of ledgers with fewer entries case -# If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute -# the limitation of number of index pages. -pageLimit=131072 - -#If all ledger directories configured are full, then support only read requests for clients. -#If "readOnlyModeEnabled=true" then on all ledger disks full, bookie will be converted -#to read-only mode and serve only read requests. Otherwise the bookie will be shutdown. -readOnlyModeEnabled=true - -# Bookie Journal Settings -writeBufferSizeBytes=524288 -journalFlushWhenQueueEmpty=false -journalRemoveFromPageCache=true -journalAdaptiveGroupWrites=true -journalMaxGroupWaitMSec=6 -journalBufferedEntriesThreshold=180 -journalBufferedWritesThreshold=262144 -journalMaxGroupedEntriesToCommit=200 -journalPreAllocSizeMB=4 -journalFlushWhenQueueEmpty=true - -# Sorted Ledger Storage Settings -sortedLedgerStorageEnabled=true -skipListSizeLimit=67108864 -skipListArenaChunkSize=2097152 -skipListArenaMaxAllocSize=131072 -fileInfoCacheInitialCapacity=10000 -fileInfoMaxIdleTime=3600 - -# Bookie Threads Settings -numAddWorkerThreads=24 -numJournalCallbackThreads=48 -numReadWorkerThreads=72 -numLongPollWorkerThreads=72 - http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/distributedlog.conf ---------------------------------------------------------------------- diff --git a/distributedlog-example/conf/distributedlog.conf b/distributedlog-example/conf/distributedlog.conf deleted file mode 100644 index dac71ac..0000000 --- a/distributedlog-example/conf/distributedlog.conf +++ /dev/null @@ -1,125 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -######################## -# ZooKeeper Client Settings -######################## - -# zookeeper settings -zkSessionTimeoutSeconds=30 -zkNumRetries=0 -zkRetryStartBackoffMillis=100 -zkRetryMaxBackoffMillis=200 -# bkc zookeeper settings -bkcZKSessionTimeoutSeconds=60 -bkcZKNumRetries=20 -bkcZKRetryStartBackoffMillis=100 -bkcZKRetryMaxBackoffMillis=200 - -######################## -# BookKeeper Client Settings -######################## - -# bookkeeper client timeouts -bkcWriteTimeoutSeconds=10 -bkcReadTimeoutSeconds=1 -bkcNumWorkerThreads=16 -# bkcNumIOThreads=16 -bkc.numChannelsPerBookie=1 -bkc.enableTaskExecutionStats=true -bkc.connectTimeoutMillis=1000 -bkc.enablePerHostStats=true - -######################## -# DL Settings -######################## - -# lock timeout -lockTimeoutSeconds=0 -# dl worker threads -numWorkerThreads=16 - -### Recovery Related Settings - -# recover log segments in background -recoverLogSegmentsInBackground=true -# disable max id in proxy -maxIdSanityCheck=true -# use allocator pool for proxy -enableLedgerAllocatorPool=false -# ledger allocator pool size -ledgerAllocatorPoolCoreSize=20 -# check stream exists or not -createStreamIfNotExists=true -# encode dc id in version -encodeDCIDInVersion=true -# logSegmentNameVersion -logSegmentNameVersion=1 - -### Write Performance Related Settings - -# ensemble size -ensemble-size=3 -write-quorum-size=3 -ack-quorum-size=2 -bkc.ensemblePlacementPolicy=org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy -bkc.delayEnsembleChange=true - -# sync settings -# buffer size is large because when we rewrite we perform a very large write to persist -# all queue state at once (up to max queue memory size, ex. 16MB). the write will be -# throttled if it takes too long, which can hurt performance, so important to optimize -# for this case. -output-buffer-size=512000 -enableImmediateFlush=false -periodicFlushFrequencyMilliSeconds=6 -logFlushTimeoutSeconds=120 - -### Ledger Rolling Related Settings - -# retention policy -retention-size=0 -# rolling ledgers (disable time rolling/enable size rolling) -rolling-interval=0 - -# max logsegment bytes=2GB -# much larger than max journal size, effectively never roll and let drpc do it -maxLogSegmentBytes=2147483648 - -# rolling concurrency -logSegmentRollingConcurrency=1 -# disable sanityCheckDelete -sanityCheckDelete=false -ledgerAllocatorPoolName=drpc-alloc-pool - -### Readahead settings - -enableReadAhead=true -ReadAheadBatchSize=10 -ReadAheadMaxEntries=100 -ReadAheadWaitTime=10 - -### Rate limit - -rpsSoftWriteLimit=1 -rpsHardWriteLimit=5 -rpsHardServiceLimit=15 - -### Config - -dynamicConfigReloadIntervalSec=5 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-example/conf/log4j.properties b/distributedlog-example/conf/log4j.properties deleted file mode 100644 index d8bf9fe..0000000 --- a/distributedlog-example/conf/log4j.properties +++ /dev/null @@ -1,44 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -# -# DistributedLog Logging Configuration -# - -dlog.root.logger=DEBUG, stderr -dlog.log.dir=. -dlog.log.file=bookkeeper-server.log - -log4j.rootLogger=${dlog.root.logger} - -log4j.logger.org.apache.zookeeper=WARN -log4j.logger.org.apache.bookkeeper=WARN - -log4j.appender.R=org.apache.log4j.RollingFileAppender -log4j.appender.R.Threshold=INFO -log4j.appender.R.File=${dlog.log.dir}/${dlog.log.file} -log4j.appender.R.MaxFileSize=200MB -log4j.appender.R.MaxBackupIndex=7 -log4j.appender.R.layout=org.apache.log4j.PatternLayout -log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n - -log4j.appender.stderr=org.apache.log4j.ConsoleAppender -log4j.appender.stderr.Target=System.err -log4j.appender.stderr.Threshold=DEBUG -log4j.appender.stderr.layout=org.apache.log4j.PatternLayout -log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/conf/stream_config/example-stream_0.conf ---------------------------------------------------------------------- diff --git a/distributedlog-example/conf/stream_config/example-stream_0.conf b/distributedlog-example/conf/stream_config/example-stream_0.conf deleted file mode 100644 index 647c870..0000000 --- a/distributedlog-example/conf/stream_config/example-stream_0.conf +++ /dev/null @@ -1,23 +0,0 @@ -#/** -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ - -output-buffer-size=512000 -periodicFlushFrequencyMilliSeconds=6 -retention-size=0 -rpsSoftWriteLimit=1 -rpsHardWriteLimit=20 http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-example/pom.xml b/distributedlog-example/pom.xml deleted file mode 100644 index e0867cf..0000000 --- a/distributedlog-example/pom.xml +++ /dev/null @@ -1,136 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>com.twitter</groupId> - <artifactId>distributedlog</artifactId> - <version>0.4.0-incubating-SNAPSHOT</version> - </parent> - <artifactId>distributedlog-example</artifactId> - <name>Apache DistributedLog :: Example</name> - <dependencies> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>distributedlog-client</artifactId> - <version>${project.parent.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>3.5.1-alpha</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>distributedlog-core</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>distributedlog-service</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>finagle-ostrich4_2.11</artifactId> - <version>${birdcage.sha}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>finagle-thriftmux_2.11</artifactId> - <version>${birdcage.sha}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>distributedlog-core</artifactId> - <version>${project.parent.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.6.4</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.6.4</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.8.1</version> - <scope>test</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - <configuration> - <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/lib</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - <profiles> - <profile> - <id>twitter-ostrich-provider</id> - <dependencies> - <dependency> - <groupId>org.apache.bookkeeper.stats</groupId> - <artifactId>twitter-ostrich-provider</artifactId> - <version>${bookkeeper.version}</version> - <exclusions> - <exclusion> - <groupId>com.twitter</groupId> - <artifactId>ostrich_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>com.twitter</groupId> - <artifactId>ostrich_2.9.2</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - </profile> - </profiles> -</project> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java ---------------------------------------------------------------------- diff --git a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java b/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java deleted file mode 100644 index 1b1474b..0000000 --- a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/DistributedLogExample.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.example; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogReader; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.LogWriter; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; - -import java.net.URI; - -import static com.google.common.base.Charsets.UTF_8; - -public class DistributedLogExample { - - private static byte[] generatePayload(String prefix, long txn) { - return String.format("%s-%d", prefix, txn).getBytes(UTF_8); - } - - public static void main(String[] args) throws Exception { - if (args.length < 1) { - System.err.println("Usage: DistributedLogExample <uri>"); - System.exit(-1); - } - URI uri = URI.create(args[0]); - // Create a distributedlog configuration - DistributedLogConfiguration conf = - new DistributedLogConfiguration() - .setLogSegmentRollingIntervalMinutes(60) // interval to roll log segment - .setRetentionPeriodHours(1) // retention period - .setWriteQuorumSize(2) // 2 replicas - .setAckQuorumSize(2) // 2 replicas - .setEnsembleSize(3); // how many hosts to store a log segment - // Create a distributedlog - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(uri) - .build(); - - DistributedLogManager unpartitionedDLM = - namespace.openLog("unpartitioned-example"); - System.out.println("Create unpartitioned stream : unpartitioned-example"); - LogWriter unpartitionedWriter = unpartitionedDLM.startLogSegmentNonPartitioned(); - for (long i = 1; i <= 10; i++) { - LogRecord record = new LogRecord(i, generatePayload("unpartitioned-example", i)); - unpartitionedWriter.write(record); - } - unpartitionedWriter.close(); - System.out.println("Write 10 records into unpartitioned stream."); - LogReader unpartitionedReader = unpartitionedDLM.getInputStream(1); - System.out.println("Read unpartitioned stream : unpartitioned-example"); - LogRecord unpartitionedRecord = unpartitionedReader.readNext(false); - while (null != unpartitionedRecord) { - System.out.println(String.format("txn %d : %s", - unpartitionedRecord.getTransactionId(), new String(unpartitionedRecord.getPayload(), "UTF-8"))); - unpartitionedRecord = unpartitionedReader.readNext(false); - } - unpartitionedReader.close(); - System.out.println("Read unpartitioned stream done."); - System.out.println("Read unpartitioned stream : unpartitioned-example from txn 5"); - LogReader unpartitionedReader2 = unpartitionedDLM.getInputStream(5); - LogRecord unpartitionedRecord2 = unpartitionedReader2.readNext(false); - while (null != unpartitionedRecord2) { - System.out.println(String.format("txn %d : %s", - unpartitionedRecord2.getTransactionId(), new String(unpartitionedRecord2.getPayload(), "UTF-8"))); - unpartitionedRecord2 = unpartitionedReader2.readNext(false); - } - unpartitionedReader2.close(); - System.out.println("Read unpartitioned stream done."); - unpartitionedDLM.delete(); - unpartitionedDLM.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java ---------------------------------------------------------------------- diff --git a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java b/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java deleted file mode 100644 index c845cbe..0000000 --- a/distributedlog-example/src/main/java/com/twitter/distributedlog/example/ProxyClusterEmulator.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.example; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.service.DistributedLogCluster; -import com.twitter.distributedlog.service.DistributedLogServerApp; - -import java.util.Arrays; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Main class for DistributedLogCluster emulator - */ -public class ProxyClusterEmulator { - static final Logger LOG = LoggerFactory.getLogger(ProxyClusterEmulator.class); - - private final DistributedLogCluster dlCluster; - private final String[] args; - - public ProxyClusterEmulator(String[] args) throws Exception { - DistributedLogConfiguration conf = new DistributedLogConfiguration(); - conf.setImmediateFlushEnabled(true); - conf.setOutputBufferSize(0); - conf.setPeriodicFlushFrequencyMilliSeconds(0); - conf.setLockTimeout(0); - this.dlCluster = DistributedLogCluster.newBuilder() - .numBookies(3) - .shouldStartZK(true) - .zkServers("127.0.0.1") - .shouldStartProxy(false) // We'll start it separately so we can pass args. - .dlConf(conf) - .build(); - this.args = args; - } - - public void start() throws Exception { - dlCluster.start(); - - // Run the server with bl cluster info. - String[] extendedArgs = new String[args.length + 2]; - System.arraycopy(args, 0, extendedArgs, 0, args.length); - extendedArgs[extendedArgs.length - 2] = "-u"; - extendedArgs[extendedArgs.length - 1] = dlCluster.getUri().toString(); - LOG.debug("Using args {}", Arrays.toString(extendedArgs)); - DistributedLogServerApp.main(extendedArgs); - } - - public void stop() throws Exception { - dlCluster.stop(); - } - - public static void main(String[] args) throws Exception { - ProxyClusterEmulator emulator = null; - try { - emulator = new ProxyClusterEmulator(args); - emulator.start(); - } catch (Exception ex) { - if (null != emulator) { - emulator.stop(); - } - System.out.println("Exception occurred running emulator " + ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java ---------------------------------------------------------------------- diff --git a/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java b/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java deleted file mode 100644 index 4897eb4..0000000 --- a/distributedlog-example/src/test/java/com/twitter/distributedlog/example/TestProxyClusterEmulator.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.example; - -import java.net.BindException; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -public class TestProxyClusterEmulator { - static final Logger logger = LoggerFactory.getLogger(TestProxyClusterEmulator.class); - - @Test(timeout = 60000) - public void testStartup() throws Exception { - final ProxyClusterEmulator emulator = new ProxyClusterEmulator(new String[] {"-port", "8000"}); - final AtomicBoolean failed = new AtomicBoolean(false); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch finished = new CountDownLatch(1); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - started.countDown(); - emulator.start(); - } catch (BindException ex) { - } catch (InterruptedException ex) { - } catch (Exception ex) { - failed.set(true); - } finally { - finished.countDown(); - } - } - }); - thread.start(); - started.await(); - Thread.sleep(1000); - thread.interrupt(); - finished.await(); - emulator.stop(); - assert(!failed.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java index a52976c..0b08471 100644 --- a/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java +++ b/distributedlog-protocol/src/main/java/com/twitter/distributedlog/util/ProtocolUtils.java @@ -20,8 +20,8 @@ package com.twitter.distributedlog.util; import java.util.zip.CRC32; import com.twitter.distributedlog.DLSN; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.google.common.base.Charsets.UTF_8; /** * With CRC embedded in the application, we have to keep track of per api crc. Ideally this @@ -29,8 +29,6 @@ import org.slf4j.LoggerFactory; */ public class ProtocolUtils { - private static final Logger logger = LoggerFactory.getLogger(ProtocolUtils.class); - // For request payload checksum private static final ThreadLocal<CRC32> requestCRC = new ThreadLocal<CRC32>() { @Override @@ -45,7 +43,7 @@ public class ProtocolUtils { public static Long writeOpCRC32(String stream, byte[] payload) { CRC32 crc = requestCRC.get(); try { - crc.update(stream.getBytes()); + crc.update(stream.getBytes(UTF_8)); crc.update(payload); return crc.getValue(); } finally { @@ -59,9 +57,8 @@ public class ProtocolUtils { public static Long truncateOpCRC32(String stream, DLSN dlsn) { CRC32 crc = requestCRC.get(); try { - crc.update(stream.getBytes()); + crc.update(stream.getBytes(UTF_8)); crc.update(dlsn.serializeBytes()); - long result = crc.getValue(); return crc.getValue(); } finally { crc.reset(); @@ -74,8 +71,7 @@ public class ProtocolUtils { public static Long streamOpCRC32(String stream) { CRC32 crc = requestCRC.get(); try { - crc.update(stream.getBytes()); - long result = crc.getValue(); + crc.update(stream.getBytes(UTF_8)); return crc.getValue(); } finally { crc.reset(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-protocol/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/resources/findbugsExclude.xml b/distributedlog-protocol/src/main/resources/findbugsExclude.xml index 29e1a16..6b2197b 100644 --- a/distributedlog-protocol/src/main/resources/findbugsExclude.xml +++ b/distributedlog-protocol/src/main/resources/findbugsExclude.xml @@ -20,4 +20,21 @@ <!-- generated code, we can't be held responsible for findbugs in it //--> <Class name="~com\.twitter\.distributedlog\.thrift.*" /> </Match> + <Match> + <!-- it is safe to store external bytes reference here. //--> + <Class name="com.twitter.distributedlog.LogRecord" /> + <Bug pattern="EI_EXPOSE_REP2" /> + </Match> + <Match> + <!-- it is safe to store external bytes reference here. //--> + <Class name="com.twitter.distributedlog.LogRecord" /> + <Method name="getPayload" /> + <Bug pattern="EI_EXPOSE_REP" /> + </Match> + <Match> + <!-- it is safe to store external bytes reference here. //--> + <Class name="com.twitter.distributedlog.io.Buffer" /> + <Method name="getData" /> + <Bug pattern="EI_EXPOSE_REP" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java index 72af11b..96a37cd 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java @@ -37,6 +37,7 @@ import com.twitter.distributedlog.thrift.service.BulkWriteResponse; import com.twitter.distributedlog.thrift.service.ResponseHeader; import com.twitter.distributedlog.thrift.service.StatusCode; import com.twitter.distributedlog.thrift.service.WriteResponse; +import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.Sequencer; import com.twitter.util.ConstFuture; import com.twitter.util.Future$; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java index 3f26f43..aa08a24 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java @@ -210,11 +210,12 @@ public class StreamManagerImpl implements StreamManager { @Override public Map<String, String> getStreamOwnershipMap(Optional<String> regex) { Map<String, String> ownershipMap = new HashMap<String, String>(); - for (String name : acquiredStreams.keySet()) { + for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) { + String name = entry.getKey(); if (regex.isPresent() && !name.matches(regex.get())) { continue; } - Stream stream = acquiredStreams.get(name); + Stream stream = entry.getValue(); if (null == stream) { continue; } @@ -248,7 +249,7 @@ public class StreamManagerImpl implements StreamManager { // add partition to cached map if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) { - throw new StreamUnavailableException("Stream " + stream + throw new StreamUnavailableException("Stream " + streamName + " is not allowed to cache more than " + maxCachedPartitions + " partitions"); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bb6990de/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java index b9c6a32..b37de10 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java @@ -52,7 +52,6 @@ public class ProxyTool extends Tool { protected abstract static class ClusterCommand extends OptsCommand { protected Options options = new Options(); - protected String dc; protected URI uri; protected final List<String> streams = new ArrayList<String>();
