DL-164: Create stream operation should not be submitted by StreamImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/63d6bde1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/63d6bde1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/63d6bde1 Branch: refs/heads/master Commit: 63d6bde1929085780171e63cc5b0c95581daa564 Parents: cfc049c Author: Sijie Guo <sij...@twitter.com> Authored: Wed Dec 28 19:09:59 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:14:02 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReader.java | 21 ++++++++++++------ .../distributedlog/ReadAheadEntryReader.java | 14 ++++++++++-- .../tools/DistributedLogTool.java | 6 ++--- distributedlog-service/pom.xml | 2 +- .../service/DistributedLogServiceImpl.java | 1 + .../placement/LeastLoadPlacementPolicy.java | 9 ++++++-- .../service/placement/ServerLoad.java | 23 +++++++++++--------- .../service/placement/StreamLoad.java | 3 +++ .../src/main/resources/findbugsExclude.xml | 4 ++++ 9 files changed, 57 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java index cebbc33..aee4103 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReader.java @@ -28,6 +28,7 @@ import com.twitter.distributedlog.exceptions.IdleReaderException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.ReadCancelledException; import com.twitter.distributedlog.exceptions.UnexpectedException; +import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; @@ -83,10 +84,11 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } }; + private final String streamName; protected final BKDistributedLogManager bkDistributedLogManager; protected final BKLogReadHandler readHandler; private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); - private final ScheduledExecutorService executorService; + private final OrderedScheduler scheduler; private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); private final Object scheduleLock = new Object(); private final AtomicLong scheduleCount = new AtomicLong(0); @@ -208,13 +210,14 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { } BKAsyncLogReader(BKDistributedLogManager bkdlm, - ScheduledExecutorService executorService, + OrderedScheduler scheduler, DLSN startDLSN, Optional<String> subscriberId, boolean returnEndOfStreamRecord, StatsLogger statsLogger) { + this.streamName = bkdlm.getStreamName(); this.bkDistributedLogManager = bkdlm; - this.executorService = executorService; + this.scheduler = scheduler; this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, this, true); LOG.debug("Starting async reader at {}", startDLSN); @@ -251,7 +254,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { // Except when idle reader threshold is less than a second (tests?) period = Math.min(period, idleErrorThresholdMillis / 5); - return executorService.scheduleAtFixedRate(new Runnable() { + return scheduler.scheduleAtFixedRate(streamName, new Runnable() { @Override public void run() { PendingReadRequest nextRequest = pendingRequests.peek(); @@ -371,7 +374,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { @Override public String getStreamName() { - return bkDistributedLogManager.getStreamName(); + return streamName; } /** @@ -470,7 +473,7 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { long prevCount = scheduleCount.getAndIncrement(); if (0 == prevCount) { scheduleDelayStopwatch.reset().start(); - executorService.submit(this); + scheduler.submit(streamName, this); } } @@ -659,7 +662,11 @@ class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { scheduleDelayStopwatch.reset().start(); scheduleCount.set(0); // the request could still wait for more records - backgroundScheduleTask = executorService.schedule(BACKGROUND_READ_SCHEDULER, remainingWaitTime, nextRequest.deadlineTimeUnit); + backgroundScheduleTask = scheduler.schedule( + streamName, + BACKGROUND_READ_SCHEDULER, + remainingWaitTime, + nextRequest.deadlineTimeUnit); return; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java index 19f4497..40e3930 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ReadAheadEntryReader.java @@ -54,6 +54,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -228,9 +229,12 @@ public class ReadAheadEntryReader implements if (null != closePromise) { return; } + } + try { safeRun(); + } catch (Throwable cause) { + logger.error("Caught unexpected exception : ", cause); } - } abstract void safeRun(); @@ -275,6 +279,7 @@ public class ReadAheadEntryReader implements // State of the reader // + private final AtomicBoolean started = new AtomicBoolean(false); private boolean isInitialized = false; private boolean readAheadPaused = false; private Promise<Void> closePromise = null; @@ -428,6 +433,7 @@ public class ReadAheadEntryReader implements public void start(final List<LogSegmentMetadata> segmentList) { logger.info("Starting the readahead entry reader for {} : segments = {}", readHandler.getFullyQualifiedName(), segmentList); + started.set(true); processLogSegments(segmentList); } @@ -530,7 +536,7 @@ public class ReadAheadEntryReader implements void setLastException(IOException cause) { if (!lastException.compareAndSet(null, cause)) { - return; + logger.debug("last exception has already been set to ", lastException.get()); } // the exception is set and notify the state change notifyStateChangeOnFailure(cause); @@ -829,6 +835,7 @@ public class ReadAheadEntryReader implements } skipTruncatedLogSegments = false; if (!isAllowedToPosition(segment, dlsnToStart)) { + logger.error("segment {} is not allowed to position at {}", segment, dlsnToStart); return; } @@ -969,6 +976,9 @@ public class ReadAheadEntryReader implements @Override public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { + if (!started.get()) { + return; + } logger.info("segments is updated with {}", segments); processLogSegments(segments); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java index 4565921..03d70bd 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/DistributedLogTool.java @@ -188,10 +188,8 @@ public class DistributedLogTool extends Tool { try { return runCmd(); } finally { - synchronized (this) { - if (null != namespace) { - namespace.close(); - } + if (null != namespace) { + namespace.close(); } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml index e74d486..052ce15 100644 --- a/distributedlog-service/pom.xml +++ b/distributedlog-service/pom.xml @@ -130,7 +130,7 @@ <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> - <version>2.2.0-incubating</version> + <version>3.2.1</version> <scope>test</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index e7974c7..5dee7fd 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -59,6 +59,7 @@ import com.twitter.distributedlog.service.stream.StreamOp; import com.twitter.distributedlog.service.stream.StreamOpStats; import com.twitter.distributedlog.service.stream.TruncateOp; import com.twitter.distributedlog.service.stream.WriteOp; +import com.twitter.distributedlog.service.stream.admin.StreamAdminOp; import com.twitter.distributedlog.service.stream.WriteOpWithPayload; import com.twitter.distributedlog.service.stream.admin.StreamAdminOp; import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java index e4c8128..8c8dc23 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java @@ -71,10 +71,15 @@ public class LeastLoadPlacementPolicy extends PlacementPolicy { }); } + private synchronized String getStreamOwner(String stream) { + return streamToServer.get(stream); + } + @Override public Future<String> placeStream(String stream) { - if (streamToServer.containsKey(stream)) { - return Future.value(streamToServer.get(stream)); + String streamOwner = getStreamOwner(stream); + if (null != streamOwner) { + return Future.value(streamOwner); } Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream); return streamLoadFuture.map(new Function<StreamLoad, String>() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java index d7fbcf2..801e499 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java @@ -57,7 +57,7 @@ public class ServerLoad implements Comparable { synchronized public long removeStream(String stream) { for (StreamLoad streamLoad : streamLoads) { if (streamLoad.stream.equals(stream)) { - this.load -= load; + this.load -= streamLoad.getLoad(); streamLoads.remove(streamLoad); return this.load; } @@ -65,19 +65,19 @@ public class ServerLoad implements Comparable { return this.load; //Throwing an exception wouldn't help us as our logic should never reach here } - public long getLoad() { + public synchronized long getLoad() { return load; } - public Set<StreamLoad> getStreamLoads() { + public synchronized Set<StreamLoad> getStreamLoads() { return streamLoads; } - public String getServer() { + public synchronized String getServer() { return server; } - protected com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() { + protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() { com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad = new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); tServerLoad.setServer(server); @@ -125,9 +125,9 @@ public class ServerLoad implements Comparable { } @Override - public int compareTo(Object o) { + public synchronized int compareTo(Object o) { ServerLoad other = (ServerLoad) o; - if (load == other.load) { + if (load == other.getLoad()) { return server.compareTo(other.getServer()); } else { return Long.compare(load, other.getLoad()); @@ -135,18 +135,21 @@ public class ServerLoad implements Comparable { } @Override - public boolean equals(Object o) { + public synchronized boolean equals(Object o) { + if (!(o instanceof ServerLoad)) { + return false; + } ServerLoad other = (ServerLoad) o; return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads()); } @Override - public String toString() { + public synchronized String toString() { return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads); } @Override - public int hashCode() { + public synchronized int hashCode() { return new HashCodeBuilder().append(server).append(load).append(streamLoads).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java index 4f3dc71..d7b7efd 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java @@ -93,6 +93,9 @@ public class StreamLoad implements Comparable { @Override public boolean equals(Object o) { + if (!(o instanceof StreamLoad)) { + return false; + } StreamLoad other = (StreamLoad) o; return stream.equals(other.getStream()) && load == other.getLoad(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/63d6bde1/distributedlog-service/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/resources/findbugsExclude.xml b/distributedlog-service/src/main/resources/findbugsExclude.xml index 502befa..d28ea93 100644 --- a/distributedlog-service/src/main/resources/findbugsExclude.xml +++ b/distributedlog-service/src/main/resources/findbugsExclude.xml @@ -21,6 +21,10 @@ <Class name="~com\.twitter\.distributedlog\.thrift.*" /> </Match> <Match> + <!-- generated code, we can't be held responsible for findbugs in it //--> + <Class name="~com\.twitter\.distributedlog\.service\.placement\.thrift.*" /> + </Match> + <Match> <!-- it is safe to cast exception here. //--> <Class name="com.twitter.distributedlog.service.DistributedLogServiceImpl$Stream$2" /> <Method name="onFailure" />