http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java deleted file mode 100644 index db0ee4e..0000000 --- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.management.LockInfo; -import java.lang.management.ManagementFactory; -import java.lang.management.MonitorInfo; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.junit.runner.notification.Failure; -import org.junit.runner.notification.RunListener; - -/** - * JUnit run listener which prints full thread dump into System.err in case a test is failed due to - * timeout. - */ -public class TimedOutTestsListener extends RunListener { - - static final String TEST_TIMED_OUT_PREFIX = "test timed out after"; - - private static String indent = " "; - - private final PrintWriter output; - - public TimedOutTestsListener() { - this.output = new PrintWriter(System.err); - } - - public TimedOutTestsListener(PrintWriter output) { - this.output = output; - } - - @Override - public void testFailure(Failure failure) throws Exception { - if (failure != null && failure.getMessage() != null && failure.getMessage().startsWith(TEST_TIMED_OUT_PREFIX)) { - output.println("====> TEST TIMED OUT. PRINTING THREAD DUMP. <===="); - output.println(); - output.print(buildThreadDiagnosticString()); - } - } - - public static String buildThreadDiagnosticString() { - StringWriter sw = new StringWriter(); - PrintWriter output = new PrintWriter(sw); - - DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS"); - output.println(String.format("Timestamp: %s", dateFormat.format(new Date()))); - output.println(); - output.println(buildThreadDump()); - - String deadlocksInfo = buildDeadlockInfo(); - if (deadlocksInfo != null) { - output.println("====> DEADLOCKS DETECTED <===="); - output.println(); - output.println(deadlocksInfo); - } - - return sw.toString(); - } - - static String buildThreadDump() { - StringBuilder dump = new StringBuilder(); - Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces(); - for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) { - Thread thread = e.getKey(); - dump.append(String.format("\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", thread.getName(), - (thread.isDaemon() ? "daemon" : ""), thread.getPriority(), thread.getId(), - Thread.State.WAITING.equals(thread.getState()) ? "in Object.wait()" - : StringUtils.lowerCase(thread.getState().name()), - Thread.State.WAITING.equals(thread.getState()) ? "WAITING (on object monitor)" : thread.getState())); - for (StackTraceElement stackTraceElement : e.getValue()) { - dump.append("\n at "); - dump.append(stackTraceElement); - } - dump.append("\n"); - } - return dump.toString(); - } - - static String buildDeadlockInfo() { - ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - long[] threadIds = threadBean.findMonitorDeadlockedThreads(); - if (threadIds != null && threadIds.length > 0) { - StringWriter stringWriter = new StringWriter(); - PrintWriter out = new PrintWriter(stringWriter); - - ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, true, true); - for (ThreadInfo ti : infos) { - printThreadInfo(ti, out); - printLockInfo(ti.getLockedSynchronizers(), out); - out.println(); - } - - out.close(); - return stringWriter.toString(); - } else { - return null; - } - } - - private static void printThreadInfo(ThreadInfo ti, PrintWriter out) { - // print thread information - printThread(ti, out); - - // print stack trace with locks - StackTraceElement[] stacktrace = ti.getStackTrace(); - MonitorInfo[] monitors = ti.getLockedMonitors(); - for (int i = 0; i < stacktrace.length; i++) { - StackTraceElement ste = stacktrace[i]; - out.println(indent + "at " + ste.toString()); - for (MonitorInfo mi : monitors) { - if (mi.getLockedStackDepth() == i) { - out.println(indent + " - locked " + mi); - } - } - } - out.println(); - } - - private static void printThread(ThreadInfo ti, PrintWriter out) { - out.print("\"" + ti.getThreadName() + "\"" + " Id=" + ti.getThreadId() + " in " + ti.getThreadState()); - if (ti.getLockName() != null) { - out.print(" on lock=" + ti.getLockName()); - } - if (ti.isSuspended()) { - out.print(" (suspended)"); - } - if (ti.isInNative()) { - out.print(" (running in native)"); - } - out.println(); - if (ti.getLockOwnerName() != null) { - out.println(indent + " owned by " + ti.getLockOwnerName() + " Id=" + ti.getLockOwnerId()); - } - } - - private static void printLockInfo(LockInfo[] locks, PrintWriter out) { - out.println(indent + "Locked synchronizers: count = " + locks.length); - for (LockInfo li : locks) { - out.println(indent + " - " + li); - } - out.println(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/test/resources/log4j.properties b/distributedlog-protocol/src/test/resources/log4j.properties new file mode 100644 index 0000000..3e51059 --- /dev/null +++ b/distributedlog-protocol/src/test/resources/log4j.properties @@ -0,0 +1,51 @@ +#/** +# * Licensed to the Apache Software Foundation (ASF) under one +# * or more contributor license agreements. See the NOTICE file +# * distributed with this work for additional information +# * regarding copyright ownership. The ASF licenses this file +# * to you under the Apache License, Version 2.0 (the +# * "License"); you may not use this file except in compliance +# * with the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +# +# DisributedLog Logging Configuration +# + +# Example with rolling log file +log4j.rootLogger=INFO, CONSOLE + +#disable zookeeper logging +log4j.logger.org.apache.zookeeper=OFF +#Set the bookkeeper level to warning +log4j.logger.org.apache.bookkeeper=INFO + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.Threshold=INFO +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +# Add ROLLINGFILE to rootLogger to get log file output +# Log DEBUG level and above messages to a log file +#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender +#log4j.appender.ROLLINGFILE.Threshold=INFO +#log4j.appender.ROLLINGFILE.File=distributedlog.log +#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout +#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm +#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +log4j.appender.R=org.apache.log4j.RollingFileAppender +log4j.appender.R.Threshold=TRACE +log4j.appender.R.File=target/error.log +log4j.appender.R.MaxFileSize=200MB +log4j.appender.R.MaxBackupIndex=7 +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-client/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml index 7392d90..25ad732 100644 --- a/distributedlog-proxy-client/pom.xml +++ b/distributedlog-proxy-client/pom.xml @@ -86,7 +86,7 @@ </dependency> <dependency> <groupId>org.apache.distributedlog</groupId> - <artifactId>distributedlog-protocol</artifactId> + <artifactId>distributedlog-common</artifactId> <version>${project.parent.version}</version> <type>test-jar</type> <scope>test</scope> @@ -129,7 +129,7 @@ <properties> <property> <name>listener</name> - <value>org.apache.distributedlog.TimedOutTestsListener</value> + <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value> </property> </properties> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java index b3f3368..781005c 100644 --- a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java +++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE; import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; +import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newJFuture; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; @@ -440,7 +441,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { } Promise<DLSN> writePromise = new Promise<DLSN>(); try { - recordSetWriter.writeRecord(buffer, writePromise); + recordSetWriter.writeRecord(buffer, newJFuture(writePromise)); } catch (LogRecordTooLongException e) { return Future.exception(e); } catch (WriteException e) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java new file mode 100644 index 0000000..6ce1fa4 --- /dev/null +++ b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.protocol.util; + +import com.google.common.collect.Lists; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.apache.distributedlog.common.concurrent.FutureUtils; + +/** + * Utils for Twitter's {@link com.twitter.util.Future}. + */ +public final class TwitterFutureUtils { + + private TwitterFutureUtils() {} + + public static <T> CompletableFuture<T> newJFuture(Promise<T> promise) { + CompletableFuture<T> jFuture = FutureUtils.createFuture(); + jFuture.whenComplete((value, cause) -> { + if (null != cause) { + if (cause instanceof CompletionException) { + promise.setException(cause.getCause()); + } else { + promise.setException(cause); + } + } else { + promise.setValue(value); + } + }); + return jFuture; + } + + public static <T> Future<T> newTFuture(CompletableFuture<T> jFuture) { + Promise<T> promise = new Promise<>(); + jFuture.whenComplete((value, cause) -> { + if (null != cause) { + if (cause instanceof CompletionException) { + promise.setException(cause.getCause()); + } else { + promise.setException(cause); + } + } else { + promise.setValue(value); + } + }); + return promise; + } + + public static <T> Future<List<Future<T>>> newTFutureList( + CompletableFuture<List<CompletableFuture<T>>> jFutureList) { + Promise<List<Future<T>>> promise = new Promise<>(); + jFutureList.whenComplete((value, cause) -> { + if (null != cause) { + if (cause instanceof CompletionException) { + promise.setException(cause.getCause()); + } else { + promise.setException(cause); + } + } else { + promise.setValue(Lists.transform( + value, + future -> newTFuture(future))); + } + }); + return promise; + } + + public static <T> void setValue(Promise<T> promise, T value) { + promise.updateIfEmpty(new Return<T>(value)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/pom.xml b/distributedlog-proxy-server/pom.xml index d7cbd56..83b2bef 100644 --- a/distributedlog-proxy-server/pom.xml +++ b/distributedlog-proxy-server/pom.xml @@ -135,7 +135,7 @@ </dependency> <dependency> <groupId>org.apache.distributedlog</groupId> - <artifactId>distributedlog-protocol</artifactId> + <artifactId>distributedlog-common</artifactId> <version>${project.parent.version}</version> <type>test-jar</type> <scope>test</scope> @@ -189,7 +189,7 @@ <properties> <property> <name>listener</name> - <value>org.apache.distributedlog.TimedOutTestsListener</value> + <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value> </property> </properties> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java index 81e476b..c904499 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java @@ -37,7 +37,7 @@ import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConver import org.apache.distributedlog.service.streamset.StreamPartitionConverter; import org.apache.distributedlog.thrift.service.DistributedLogService; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.common.util.SchedulerUtils; import com.twitter.finagle.Stack; import com.twitter.finagle.ThriftMuxServer$; import com.twitter.finagle.builder.Server; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java index c37cd53..72f2758 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java @@ -25,6 +25,7 @@ import com.twitter.common.net.InetSocketAddressHelper; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.client.resolver.DefaultRegionResolver; import org.apache.distributedlog.client.resolver.RegionResolver; import org.apache.distributedlog.client.routing.RoutingService; @@ -35,10 +36,9 @@ import org.apache.distributedlog.exceptions.ServiceUnavailableException; import org.apache.distributedlog.exceptions.StreamUnavailableException; import org.apache.distributedlog.exceptions.TooManyStreamsException; import org.apache.distributedlog.feature.AbstractFeatureProvider; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.rate.MovingAverageRate; -import org.apache.distributedlog.rate.MovingAverageRateFactory; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.common.rate.MovingAverageRate; +import org.apache.distributedlog.common.rate.MovingAverageRateFactory; import org.apache.distributedlog.service.config.ServerConfiguration; import org.apache.distributedlog.service.config.StreamConfigProvider; import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy; @@ -76,7 +76,7 @@ import org.apache.distributedlog.thrift.service.WriteContext; import org.apache.distributedlog.thrift.service.WriteResponse; import org.apache.distributedlog.util.ConfUtils; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.common.util.SchedulerUtils; import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Function; @@ -116,7 +116,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI private final ServerConfiguration serverConfig; private final DistributedLogConfiguration dlConfig; - private final DistributedLogNamespace dlNamespace; + private final Namespace dlNamespace; private final int serverRegionId; private final PlacementPolicy placementPolicy; private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT; @@ -199,7 +199,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } // Build the namespace - this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder() + this.dlNamespace = NamespaceBuilder.newBuilder() .conf(dlConf) .uri(uri) .statsLogger(statsLogger) @@ -218,8 +218,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI this.scheduler = OrderedScheduler.newBuilder() .corePoolSize(numThreads) .name("DistributedLogService-Executor") - .traceTaskExecution(true) - .statsLogger(statsLogger.scope("scheduler")) .build(); // Timer, kept separate to ensure reliability of timeouts. @@ -261,7 +259,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // Resource limiting this.timer = new ScheduledThreadPoolTimer(1, "timer", true); - this.movingAvgFactory = new MovingAverageRateFactory(timer); + this.movingAvgFactory = new MovingAverageRateFactory(scheduler); this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS); this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS); this.limiter = new ServiceRequestLimiter( @@ -783,7 +781,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } @VisibleForTesting - public DistributedLogNamespace getDistributedLogNamespace() { + public Namespace getDistributedLogNamespace() { return dlNamespace; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java index b1e2879..969c598 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java @@ -27,14 +27,14 @@ import com.google.common.hash.Hashing; import com.twitter.common.zookeeper.ServerSet; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.callback.LogSegmentListener; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.client.monitor.MonitorServiceClient; import org.apache.distributedlog.client.serverset.DLZkServerSet; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.stats.Stat; import com.twitter.finagle.stats.StatsReceiver; @@ -71,7 +71,7 @@ public class MonitorService implements NamespaceListener { private static final Logger logger = LoggerFactory.getLogger(MonitorService.class); - private DistributedLogNamespace dlNamespace = null; + private Namespace dlNamespace = null; private MonitorServiceClient dlClient = null; private DLZkServerSet[] zkServerSets = null; private final ScheduledExecutorService executorService = @@ -411,7 +411,7 @@ public class MonitorService implements NamespaceListener { // stats statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge); logger.info("Construct dl namespace @ {}", dlUri); - dlNamespace = DistributedLogNamespaceBuilder.newBuilder() + dlNamespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(dlUri) .build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java index 08f4b41..b327867 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java @@ -17,6 +17,7 @@ */ package org.apache.distributedlog.service; +import java.util.concurrent.CompletionException; import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.thrift.service.BulkWriteResponse; @@ -53,6 +54,8 @@ public class ResponseUtils { } response.setCode(StatusCode.findByValue(dle.getCode())); response.setErrMsg(dle.getMessage()); + } else if (t instanceof CompletionException) { + return exceptionToHeader(t.getCause()); } else { response.setCode(StatusCode.INTERNAL_SERVER_ERROR); response.setErrMsg("Internal server error : " + t.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java index 7d72093..53e16b4 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java @@ -20,11 +20,11 @@ package org.apache.distributedlog.service.config; import com.google.common.base.Optional; import com.google.common.collect.Lists; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.ConcurrentConstConfiguration; -import org.apache.distributedlog.config.ConfigurationSubscription; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.common.config.ConfigurationSubscription; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.config.FileConfigurationBuilder; -import org.apache.distributedlog.config.PropertiesConfigurationBuilder; +import org.apache.distributedlog.common.config.FileConfigurationBuilder; +import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder; import java.io.File; import java.net.MalformedURLException; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java index 257b4be..1e62302 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; import org.apache.distributedlog.service.streamset.StreamPartitionConverter; import org.apache.bookkeeper.util.ReflectionUtils; @@ -160,7 +161,7 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Set the region id used to instantiate DistributedLogNamespace. + * Set the region id used to instantiate Namespace. * * @param regionId * region id @@ -172,9 +173,9 @@ public class ServerConfiguration extends CompositeConfiguration { } /** - * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}. + * Get the region id used to instantiate {@link Namespace}. * - * @return region id used to instantiate DistributedLogNamespace + * @return region id used to instantiate Namespace */ public int getRegionId() { return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT); @@ -216,7 +217,7 @@ public class ServerConfiguration extends CompositeConfiguration { /** * Get the shard id of this server. * - * <p>It would be used to instantiate the client id used for DistributedLogNamespace. + * <p>It would be used to instantiate the client id used for Namespace. * * @return shard id of this server. */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java index 2e9dd6b..1336ddd 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java @@ -18,7 +18,7 @@ package org.apache.distributedlog.service.placement; import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.api.namespace.Namespace; import com.twitter.util.Duration; import com.twitter.util.Function; import com.twitter.util.Future; @@ -52,7 +52,7 @@ public class LeastLoadPlacementPolicy extends PlacementPolicy { private Map<String, String> streamToServer = new HashMap<String, String>(); public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, - DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Namespace namespace, PlacementStateManager placementStateManager, Duration refreshInterval, StatsLogger statsLogger) { super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger); statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java index ac952aa..17edc22 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java @@ -17,8 +17,8 @@ */ package org.apache.distributedlog.service.placement; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.service.DLSocketAddress; import com.twitter.util.Duration; import com.twitter.util.Function0; @@ -53,14 +53,14 @@ public abstract class PlacementPolicy { protected final LoadAppraiser loadAppraiser; protected final RoutingService routingService; - protected final DistributedLogNamespace namespace; + protected final Namespace namespace; protected final PlacementStateManager placementStateManager; private final Duration refreshInterval; protected final OpStatsLogger placementCalcStats; private Timer placementRefreshTimer; public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, - DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Namespace namespace, PlacementStateManager placementStateManager, Duration refreshInterval, StatsLogger statsLogger) { this.loadAppraiser = loadAppraiser; this.routingService = routingService; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java index 862f05a..5dcea73 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java @@ -63,8 +63,7 @@ public class ZKPlacementStateManager implements PlacementStateManager { serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; } - private void createServerLoadPathIfNoExists(byte[] data) - throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { + private void createServerLoadPathIfNoExists(byte[] data) throws KeeperException, IOException { try { Utils.zkCreateFullPathOptimistic( zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT); @@ -152,7 +151,7 @@ public class ZKPlacementStateManager implements PlacementStateManager { watching = false; watch(callback); } - } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) { + } catch (InterruptedException | IOException | KeeperException e) { logger.error("Watch of Ownership failed", e); watching = false; watch(callback); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java index 83ac668..7700184 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java @@ -18,13 +18,13 @@ package org.apache.distributedlog.service.stream; import com.google.common.base.Stopwatch; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.exceptions.ChecksumFailedException; import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; import org.apache.distributedlog.service.ResponseUtils; import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java index 6c98468..372703a 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java @@ -17,7 +17,9 @@ */ package org.apache.distributedlog.service.stream; -import org.apache.distributedlog.AsyncLogWriter; +import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFutureList; + +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.acl.AccessControlManager; @@ -33,7 +35,7 @@ import org.apache.distributedlog.thrift.service.BulkWriteResponse; import org.apache.distributedlog.thrift.service.ResponseHeader; import org.apache.distributedlog.thrift.service.StatusCode; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.ConstFuture; import com.twitter.util.Future; import com.twitter.util.Future$; @@ -157,7 +159,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements Future<List<Future<DLSN>>> futureList; synchronized (txnLock) { records = asRecordList(buffers, sequencer); - futureList = writer.writeBulk(records); + futureList = newTFutureList(writer.writeBulk(records)); } // Collect into a list of tries to make it easier to extract exception or DLSN. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java index 3ecb46f..24ce0be 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java @@ -17,13 +17,13 @@ */ package org.apache.distributedlog.service.stream; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.acl.AccessControlManager; import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.RequestDeniedException; import org.apache.distributedlog.service.ResponseUtils; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.Counter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java index 0ffa619..c9dec80 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java @@ -18,8 +18,9 @@ package org.apache.distributedlog.service.stream; import static com.google.common.base.Charsets.UTF_8; +import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.acl.AccessControlManager; @@ -27,7 +28,7 @@ import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.RequestDeniedException; import org.apache.distributedlog.service.ResponseUtils; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.Counter; @@ -78,7 +79,7 @@ public class HeartbeatOp extends AbstractWriteOp { txnId = sequencer.nextId(); LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA); hbRecord.setControl(); - writeResult = writer.write(hbRecord); + writeResult = newTFuture(writer.write(hbRecord)); } return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java index 6ec8642..d657660 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java @@ -17,13 +17,13 @@ */ package org.apache.distributedlog.service.stream; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.acl.AccessControlManager; import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.RequestDeniedException; import org.apache.distributedlog.service.ResponseUtils; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.Counter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java index 2b90d55..98362b5 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java @@ -18,8 +18,8 @@ package org.apache.distributedlog.service.stream; import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.service.FatalErrorHandler; import org.apache.distributedlog.service.config.ServerConfiguration; import org.apache.distributedlog.service.config.StreamConfigProvider; @@ -40,7 +40,7 @@ public class StreamFactoryImpl implements StreamFactory { private final FeatureProvider featureProvider; private final StreamConfigProvider streamConfigProvider; private final StreamPartitionConverter streamPartitionConverter; - private final DistributedLogNamespace dlNamespace; + private final Namespace dlNamespace; private final OrderedScheduler scheduler; private final FatalErrorHandler fatalErrorHandler; private final HashedWheelTimer requestTimer; @@ -53,7 +53,7 @@ public class StreamFactoryImpl implements StreamFactory { FeatureProvider featureProvider, StreamConfigProvider streamConfigProvider, StreamPartitionConverter streamPartitionConverter, - DistributedLogNamespace dlNamespace, + Namespace dlNamespace, OrderedScheduler scheduler, FatalErrorHandler fatalErrorHandler, HashedWheelTimer requestTimer) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java index c0c0972..df3d64f 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java @@ -20,9 +20,10 @@ package org.apache.distributedlog.service.stream; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; -import org.apache.distributedlog.AsyncLogWriter; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.AlreadyClosedException; import org.apache.distributedlog.exceptions.DLException; @@ -33,15 +34,16 @@ import org.apache.distributedlog.exceptions.StreamNotReadyException; import org.apache.distributedlog.exceptions.StreamUnavailableException; import org.apache.distributedlog.exceptions.UnexpectedException; import org.apache.distributedlog.io.Abortables; -import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.protocol.util.TwitterFutureUtils; import org.apache.distributedlog.service.FatalErrorHandler; import org.apache.distributedlog.service.ServerFeatureKeys; import org.apache.distributedlog.service.config.ServerConfiguration; import org.apache.distributedlog.service.config.StreamConfigProvider; import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter; import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.stats.BroadCastStatsLogger; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.stats.BroadCastStatsLogger; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.TimeSequencer; import org.apache.distributedlog.util.Utils; @@ -50,7 +52,6 @@ import com.twitter.util.Function0; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; -import com.twitter.util.TimeoutException; import com.twitter.util.Timer; import java.io.IOException; import java.util.ArrayDeque; @@ -126,7 +127,7 @@ public class StreamImpl implements Stream { private final StreamRequestLimiter limiter; private final DynamicDistributedLogConfiguration dynConf; private final DistributedLogConfiguration dlConfig; - private final DistributedLogNamespace dlNamespace; + private final Namespace dlNamespace; private final String clientId; private final OrderedScheduler scheduler; private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); @@ -169,7 +170,7 @@ public class StreamImpl implements Stream { DynamicDistributedLogConfiguration streamConf, FeatureProvider featureProvider, StreamConfigProvider streamConfigProvider, - DistributedLogNamespace dlNamespace, + Namespace dlNamespace, OrderedScheduler scheduler, FatalErrorHandler fatalErrorHandler, HashedWheelTimer requestTimer, @@ -555,8 +556,8 @@ public class StreamImpl implements Stream { Future<Boolean> acquireStream() { final Stopwatch stopwatch = Stopwatch.createStarted(); final Promise<Boolean> acquirePromise = new Promise<Boolean>(); - manager.openAsyncLogWriter().addEventListener( - FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { + manager.openAsyncLogWriter().whenCompleteAsync( + new org.apache.distributedlog.common.concurrent.FutureEventListener<AsyncLogWriter>() { @Override public void onSuccess(AsyncLogWriter w) { @@ -568,7 +569,7 @@ public class StreamImpl implements Stream { onAcquireStreamFailure(cause, stopwatch, acquirePromise); } - }, scheduler, getStreamName())); + }, scheduler.chooseExecutor(getStreamName())); return acquirePromise; } @@ -662,7 +663,7 @@ public class StreamImpl implements Stream { pendingOpsCounter.dec(); } Abortables.asyncAbort(oldWriter, true); - FutureUtils.setValue(acquirePromise, success); + TwitterFutureUtils.setValue(acquirePromise, success); } // @@ -802,7 +803,7 @@ public class StreamImpl implements Stream { logger.info("Removed cached stream {}.", getStreamName()); } } - FutureUtils.setValue(closePromise, null); + TwitterFutureUtils.setValue(closePromise, null); } /** @@ -825,7 +826,7 @@ public class StreamImpl implements Stream { } logger.info("Closing stream {} ...", name); // Close the writers to release the locks before failing the requests - Future<Void> closeWriterFuture; + CompletableFuture<Void> closeWriterFuture; if (abort) { closeWriterFuture = Abortables.asyncAbort(writer, true); } else { @@ -839,25 +840,38 @@ public class StreamImpl implements Stream { closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs); } - FutureUtils.stats( + CompletableFuture<Void> maskedFuture = FutureUtils.createFuture(); + FutureUtils.proxyTo( + FutureUtils.stats( closeWriterFuture, writerCloseStatLogger, Stopwatch.createStarted() - ).masked().within(futureTimer, closeWaitDuration) - .addEventListener(FutureUtils.OrderedFutureEventListener.of( - new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - postClose(uncache); - } - @Override - public void onFailure(Throwable cause) { - if (cause instanceof TimeoutException) { - writerCloseTimeoutCounter.inc(); - } - postClose(uncache); + ), + maskedFuture); + + FutureUtils.within( + maskedFuture, + closeWaitDuration.inMillis(), + TimeUnit.MILLISECONDS, + new java.util.concurrent.TimeoutException("Timeout on closing"), + scheduler, + name + ).whenCompleteAsync( + new org.apache.distributedlog.common.concurrent.FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + postClose(uncache); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof java.util.concurrent.TimeoutException) { + writerCloseTimeoutCounter.inc(); } - }, scheduler, name)); + } + }, + scheduler.chooseExecutor(name) + ); return closePromise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java index 5d54738..fd57c17 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java @@ -21,11 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.util.concurrent.RateLimiter; import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.ServiceUnavailableException; import org.apache.distributedlog.exceptions.StreamUnavailableException; import org.apache.distributedlog.exceptions.UnexpectedException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.service.config.StreamConfigProvider; import org.apache.distributedlog.service.streamset.Partition; import org.apache.distributedlog.service.streamset.PartitionMap; @@ -86,7 +86,7 @@ public class StreamManagerImpl implements StreamManager { private final String clientId; private boolean closed = false; private final StreamFactory streamFactory; - private final DistributedLogNamespace dlNamespace; + private final Namespace dlNamespace; public StreamManagerImpl(String clientId, DistributedLogConfiguration dlConfig, @@ -94,7 +94,7 @@ public class StreamManagerImpl implements StreamManager { StreamFactory streamFactory, StreamPartitionConverter partitionConverter, StreamConfigProvider streamConfigProvider, - DistributedLogNamespace dlNamespace) { + Namespace dlNamespace) { this.clientId = clientId; this.executorService = executorService; this.streamFactory = streamFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java index d0b8de4..b608e11 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java @@ -18,10 +18,10 @@ package org.apache.distributedlog.service.stream; import com.google.common.base.Stopwatch; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java index f3fc610..feb2c6a 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java @@ -18,7 +18,7 @@ package org.apache.distributedlog.service.stream; import org.apache.distributedlog.service.streamset.Partition; -import org.apache.distributedlog.stats.BroadCastStatsLogger; +import org.apache.distributedlog.common.stats.BroadCastStatsLogger; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java index 0036a5c..5d6dd1c 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java @@ -17,7 +17,9 @@ */ package org.apache.distributedlog.service.stream; -import org.apache.distributedlog.AsyncLogWriter; +import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture; + +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.acl.AccessControlManager; import org.apache.distributedlog.exceptions.DLException; @@ -25,14 +27,13 @@ import org.apache.distributedlog.exceptions.RequestDeniedException; import org.apache.distributedlog.protocol.util.ProtocolUtils; import org.apache.distributedlog.service.ResponseUtils; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; /** * Operation to truncate a log stream. @@ -72,12 +73,7 @@ public class TruncateOp extends AbstractWriteOp { logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName()); return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request")); } - return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() { - @Override - public WriteResponse apply(Boolean v1) { - return ResponseUtils.writeSuccess(); - } - }); + return newTFuture(writer.truncate(dlsn).thenApply((value) -> ResponseUtils.writeSuccess())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java index 2e7ffb8..0a8a2da 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java @@ -17,7 +17,9 @@ */ package org.apache.distributedlog.service.stream; -import org.apache.distributedlog.AsyncLogWriter; +import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture; + +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.acl.AccessControlManager; @@ -31,7 +33,7 @@ import org.apache.distributedlog.service.streamset.StreamPartitionConverter; import org.apache.distributedlog.thrift.service.ResponseHeader; import org.apache.distributedlog.thrift.service.StatusCode; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import java.nio.ByteBuffer; @@ -150,7 +152,7 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload { if (isRecordSet) { record.setRecordSet(); } - writeResult = writer.write(record); + writeResult = newTFuture(writer.write(record)); } return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java index de805aa..549262d 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java @@ -22,7 +22,7 @@ import org.apache.distributedlog.exceptions.OverCapacityException; import org.apache.distributedlog.limiter.ChainedRequestLimiter; import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction; import org.apache.distributedlog.limiter.RequestLimiter; -import org.apache.distributedlog.rate.MovingAverageRate; +import org.apache.distributedlog.common.rate.MovingAverageRate; import org.apache.distributedlog.service.stream.StreamManager; import org.apache.distributedlog.service.stream.StreamOp; import org.apache.bookkeeper.feature.Feature; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java index 7675d6f..2551a5e 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java @@ -20,7 +20,7 @@ package org.apache.distributedlog.service.stream.limiter; import org.apache.distributedlog.exceptions.OverCapacityException; import org.apache.distributedlog.exceptions.TooManyStreamsException; import org.apache.distributedlog.limiter.RequestLimiter; -import org.apache.distributedlog.rate.MovingAverageRate; +import org.apache.distributedlog.common.rate.MovingAverageRate; import org.apache.distributedlog.service.stream.StreamManager; import org.apache.distributedlog.service.stream.StreamOp; import org.apache.bookkeeper.stats.Counter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java index 4a5dd01..16e36c9 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java @@ -26,23 +26,23 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.base.Optional; -import org.apache.distributedlog.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogReader; import org.apache.distributedlog.DLMTestUtil; import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.DistributedLogManager; -import org.apache.distributedlog.LogReader; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.LogReader; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.TestZooKeeperClientBuilder; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.annotations.DistributedLogAnnotations; +import org.apache.distributedlog.common.annotations.DistributedLogAnnotations; import org.apache.distributedlog.client.routing.LocalRoutingService; import org.apache.distributedlog.exceptions.DLException; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.impl.acl.ZKAccessControl; import org.apache.distributedlog.impl.metadata.BKDLConfig; -import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.service.stream.StreamManagerImpl; import org.apache.distributedlog.thrift.AccessControlEntry; import org.apache.distributedlog.thrift.service.BulkWriteResponse; @@ -50,7 +50,7 @@ import org.apache.distributedlog.thrift.service.HeartbeatOptions; import org.apache.distributedlog.thrift.service.StatusCode; import org.apache.distributedlog.thrift.service.WriteContext; import org.apache.distributedlog.util.FailpointUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.thrift.ClientId$; import com.twitter.util.Await; @@ -105,7 +105,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT HeartbeatOptions hbOptions = new HeartbeatOptions(); hbOptions.setSendHeartBeatToReader(true); // make sure the first log segment of each stream created - FutureUtils.result(dlClient.dlClient.heartbeat(name)); + Await.result(dlClient.dlClient.heartbeat(name)); DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); LogReader reader = dlm.getInputStream(1); @@ -305,7 +305,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT } validateFailedAsLogRecordTooLong(futures.get(writeCount)); - FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); + Await.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); assertEquals(writeCount, succeeded); } @@ -325,7 +325,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); validateFailedAsLogRecordTooLong(futures.get(0)); - FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1))); + Await.result(Futures.collect(futures.subList(1, writeCount + 1))); } @Test(timeout = 60000) @@ -601,7 +601,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT .connectionTimeoutMs(60000) .sessionTimeoutMs(60000) .build(); - DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); + Namespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name; ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java index 4a2d65f..60f814e 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java @@ -50,7 +50,6 @@ import org.apache.distributedlog.thrift.service.StatusCode; import org.apache.distributedlog.thrift.service.WriteContext; import org.apache.distributedlog.thrift.service.WriteResponse; import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.FutureUtils; import com.twitter.util.Await; import com.twitter.util.Future; import java.net.URI; @@ -446,7 +445,8 @@ public class TestDistributedLogService extends TestDistributedLogBase { assertTrue("Write should not fail before closing", futureList.get(i).isDefined()); WriteResponse response = Await.result(futureList.get(i)); - assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION, + assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION + + " but " + response.getHeader().getCode() + " is received.", StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); @@ -500,7 +500,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { } assertTrue("Stream " + streamName + " should be cached", streamManager.getCachedStreams().containsKey(streamName)); - List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList)); + List<WriteResponse> resultList = Await.result(Future.collect(futureList)); for (WriteResponse wr : resultList) { assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn())); } @@ -689,7 +689,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { HeartbeatOptions hbOptions = new HeartbeatOptions(); hbOptions.setSendHeartBeatToReader(true); // make sure the first log segment of each stream created - FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); + Await.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); for (int j = 0; j < numWrites; j++) { futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); } @@ -741,7 +741,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { HeartbeatOptions hbOptions = new HeartbeatOptions(); hbOptions.setSendHeartBeatToReader(true); // make sure the first log segment of each stream created - FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); + Await.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions)); for (int j = 0; j < numWrites; j++) { futureList.add(localService.write(streamName, createRecord(i * numWrites + j))); } @@ -803,7 +803,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { service.startPlacementPolicy(); - WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext())); + WriteResponse response = Await.result(service.getOwner("stream-1", new WriteContext())); assertEquals(StatusCode.FOUND, response.getHeader().getCode()); assertEquals(service.getServiceAddress().toString(), response.getHeader().getLocation()); @@ -824,7 +824,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { assertNull(stream.getLastException()); // the stream is acquired - response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); + response = Await.result(service.getOwner("stream-2", new WriteContext())); assertEquals(StatusCode.FOUND, response.getHeader().getCode()); assertEquals(service.getServiceAddress().toString(), response.getHeader().getLocation()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java index 5f5ecd4..5b8e3a6 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Future; @@ -55,7 +55,7 @@ public class TestLeastLoadPlacementPolicy { int numSevers = new Random().nextInt(20) + 1; int numStreams = new Random().nextInt(200) + 1; RoutingService mockRoutingService = mock(RoutingService.class); - DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + Namespace mockNamespace = mock(Namespace.class); LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy( new EqualLoadAppraiser(), mockRoutingService, @@ -81,7 +81,7 @@ public class TestLeastLoadPlacementPolicy { int numStreams = new Random().nextInt(200) + 1; RoutingService mockRoutingService = mock(RoutingService.class); when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers)); - DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + Namespace mockNamespace = mock(Namespace.class); try { when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator()); } catch (IOException e) { @@ -112,7 +112,7 @@ public class TestLeastLoadPlacementPolicy { /* use AtomicInteger to have a final object in answer method */ final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE); RoutingService mockRoutingService = mock(RoutingService.class); - DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class); + Namespace mockNamespace = mock(Namespace.class); LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class); when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java index 56e9483..2e87b71 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.service.config.StreamConfigProvider; import org.apache.distributedlog.service.streamset.Partition; import org.apache.distributedlog.service.streamset.StreamPartitionConverter; @@ -67,7 +67,7 @@ public class TestStreamManager { mockStreamFactory, mockPartitionConverter, mockStreamConfigProvider, - mock(DistributedLogNamespace.class)); + mock(Namespace.class)); assertFalse(streamManager.isAcquired("stream1")); assertEquals(0, streamManager.numAcquired()); @@ -117,7 +117,7 @@ public class TestStreamManager { (DynamicDistributedLogConfiguration) any(), (StreamManager) any()) ).thenReturn(mockStream); - DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class); + Namespace dlNamespace = mock(Namespace.class); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1); StreamManager streamManager = new StreamManagerImpl( http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java index a18fda1..dc861a4 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.acl.DefaultAccessControlManager; @@ -32,9 +32,9 @@ import org.apache.distributedlog.service.config.ServerConfiguration; import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; import org.apache.distributedlog.thrift.service.StatusCode; import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.Sequencer; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.common.util.Sequencer; import com.twitter.util.Await; -import com.twitter.util.Future; import java.nio.ByteBuffer; import org.apache.bookkeeper.feature.SettableFeature; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -79,7 +79,7 @@ public class TestStreamOp { @Test(timeout = 60000) public void testResponseSucceededThenFailed() throws Exception { AsyncLogWriter writer = mock(AsyncLogWriter.class); - when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3))); + when(writer.write((LogRecord) any())).thenReturn(FutureUtils.value(new DLSN(1, 2, 3))); when(writer.getStreamName()).thenReturn("test"); WriteOp writeOp = getWriteOp(); writeOp.execute(writer, new Sequencer() { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java index 431bfa4..ccf3188 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java @@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; import org.apache.distributedlog.exceptions.OverCapacityException; import org.apache.distributedlog.limiter.ChainedRequestLimiter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties b/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties index 56a6417..7aa93f6 100644 --- a/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties +++ b/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties @@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false -log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false log4j.appender.Executors=org.apache.log4j.RollingFileAppender