http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java deleted file mode 100644 index 79f9c32..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.function; - -import scala.runtime.AbstractFunction1; - -import java.util.List; - -public class VoidFunctions { - - public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC = - new AbstractFunction1<List<Void>, Void>() { - @Override - public Void apply(List<Void> list) { - return null; - } - }; - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java index dbe5400..21fe227 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java @@ -24,7 +24,7 @@ import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.BookKeeperClientBuilder; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.MetadataAccessor; +import org.apache.distributedlog.api.MetadataAccessor; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClientBuilder; import org.apache.distributedlog.acl.AccessControlManager; @@ -47,7 +47,7 @@ import org.apache.distributedlog.metadata.LogMetadataStore; import org.apache.distributedlog.metadata.LogStreamMetadataStore; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.namespace.NamespaceDriverManager; -import org.apache.distributedlog.subscription.SubscriptionsStore; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; import org.apache.bookkeeper.feature.FeatureProvider; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java index 0761cfc..7069cbb 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java @@ -19,23 +19,22 @@ package org.apache.distributedlog.impl; import com.google.common.base.Optional; import com.google.common.collect.Lists; +import java.net.URI; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.metadata.LogMetadataStore; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -import java.net.URI; -import java.util.Iterator; -import java.util.List; - import static org.apache.distributedlog.util.DLUtils.*; /** @@ -60,18 +59,18 @@ public class ZKLogMetadataStore implements LogMetadataStore { } @Override - public Future<URI> createLog(String logName) { - return Future.value(namespace); + public CompletableFuture<URI> createLog(String logName) { + return FutureUtils.value(namespace); } @Override - public Future<Optional<URI>> getLogLocation(String logName) { - return Future.value(nsOptional); + public CompletableFuture<Optional<URI>> getLogLocation(String logName) { + return FutureUtils.value(nsOptional); } @Override - public Future<Iterator<String>> getLogs() { - final Promise<Iterator<String>> promise = new Promise<Iterator<String>>(); + public CompletableFuture<Iterator<String>> getLogs() { + final CompletableFuture<Iterator<String>> promise = new CompletableFuture<Iterator<String>>(); final String nsRootPath = namespace.getPath(); try { final ZooKeeper zk = zkc.get(); @@ -89,30 +88,30 @@ public class ZKLogMetadataStore implements LogMetadataStore { results.add(child); } } - promise.setValue(results.iterator()); + promise.complete(results.iterator()); } else if (KeeperException.Code.NONODE.intValue() == rc) { List<String> streams = Lists.newLinkedList(); - promise.setValue(streams.iterator()); + promise.complete(streams.iterator()); } else { - promise.setException(new ZKException("Error reading namespace " + nsRootPath, + promise.completeExceptionally(new ZKException("Error reading namespace " + nsRootPath, KeeperException.Code.get(rc))); } } }, null); } else if (KeeperException.Code.NONODE.intValue() == syncRc) { List<String> streams = Lists.newLinkedList(); - promise.setValue(streams.iterator()); + promise.complete(streams.iterator()); } else { - promise.setException(new ZKException("Error reading namespace " + nsRootPath, + promise.completeExceptionally(new ZKException("Error reading namespace " + nsRootPath, KeeperException.Code.get(syncRc))); } } }, null); zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java index b9cb374..f747045 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -18,6 +18,7 @@ package org.apache.distributedlog.impl; import com.google.common.collect.ImmutableList; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.ZooKeeperClient; @@ -29,17 +30,15 @@ import org.apache.distributedlog.metadata.LogMetadata; import org.apache.distributedlog.metadata.LogMetadataForWriter; import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; import org.apache.distributedlog.util.DLUtils; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Transaction; import org.apache.distributedlog.util.Transaction.OpListener; +import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.zk.DefaultZKOp; import org.apache.distributedlog.zk.ZKOp; import org.apache.distributedlog.zk.ZKTransaction; import org.apache.distributedlog.zk.ZKVersionedSetOp; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; @@ -116,7 +115,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override public void run() { if (null != store.listeners.get(logSegmentsPath)) { - store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this); + store.zkGetLogSegmentNames(logSegmentsPath, store).whenComplete(this); } else { logger.debug("Log segments listener for {} has been removed.", logSegmentsPath); } @@ -350,18 +349,18 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch } @Override - public Future<LogSegmentMetadata> getLogSegment(String logSegmentPath) { + public CompletableFuture<LogSegmentMetadata> getLogSegment(String logSegmentPath) { return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck); } - Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) { - Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>(); + CompletableFuture<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) { + CompletableFuture<Versioned<List<String>>> result = new CompletableFuture<Versioned<List<String>>>(); try { zkc.get().getChildren(logSegmentsPath, watcher, this, result); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - result.setException(FutureUtils.zkException(e, logSegmentsPath)); + result.completeExceptionally(Utils.zkException(e, logSegmentsPath)); } catch (InterruptedException e) { - result.setException(FutureUtils.zkException(e, logSegmentsPath)); + result.completeExceptionally(Utils.zkException(e, logSegmentsPath)); } return result; } @@ -369,21 +368,21 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch @Override @SuppressWarnings("unchecked") public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx); + CompletableFuture<Versioned<List<String>>> result = ((CompletableFuture<Versioned<List<String>>>) ctx); if (KeeperException.Code.OK.intValue() == rc) { /** cversion: the number of changes to the children of this znode **/ ZkVersion zkVersion = new ZkVersion(stat.getCversion()); - result.setValue(new Versioned(children, zkVersion)); + result.complete(new Versioned(children, zkVersion)); } else if (KeeperException.Code.NONODE.intValue() == rc) { - result.setException(new LogNotFoundException("Log " + path + " not found")); + result.completeExceptionally(new LogNotFoundException("Log " + path + " not found")); } else { - result.setException(new ZKException("Failed to get log segments from " + path, + result.completeExceptionally(new ZKException("Failed to get log segments from " + path, KeeperException.Code.get(rc))); } } @Override - public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, + public CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, LogSegmentNamesListener listener) { Watcher zkWatcher; if (null == listener) { @@ -422,9 +421,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch closeLock.readLock().unlock(); } } - Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher); + CompletableFuture<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher); if (null != listener) { - getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this)); + getLogSegmentNamesResult.whenComplete(new ReadLogSegmentsTask(logSegmentsPath, this)); } return zkGetLogSegmentNames(logSegmentsPath, zkWatcher); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java index 551cc44..b3fe456 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java @@ -21,17 +21,16 @@ import java.io.IOException; import java.net.URI; import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.MetadataAccessor; +import org.apache.distributedlog.api.MetadataAccessor; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClientBuilder; import org.apache.distributedlog.exceptions.AlreadyClosedException; import org.apache.distributedlog.exceptions.DLInterruptedException; import org.apache.distributedlog.impl.metadata.BKDLConfig; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.RetryPolicy; @@ -45,7 +44,7 @@ import static org.apache.distributedlog.impl.BKNamespaceDriver.getZKServersFromD public class ZKMetadataAccessor implements MetadataAccessor { static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class); protected final String name; - protected Promise<Void> closePromise; + protected CompletableFuture<Void> closePromise; protected final URI uri; // zookeeper clients // NOTE: The actual zookeeper client is initialized lazily when it is referenced by @@ -213,13 +212,13 @@ public class ZKMetadataAccessor implements MetadataAccessor { * @return future represents the close result. */ @Override - public Future<Void> asyncClose() { - Promise<Void> closeFuture; + public CompletableFuture<Void> asyncClose() { + CompletableFuture<Void> closeFuture; synchronized (this) { if (null != closePromise) { return closePromise; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); } // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests // the managers created by the namespace - whose zkc will be closed by namespace @@ -233,13 +232,13 @@ public class ZKMetadataAccessor implements MetadataAccessor { } catch (Exception e) { LOG.warn("Exception while closing distributed log manager", e); } - FutureUtils.setValue(closeFuture, null); + FutureUtils.complete(closeFuture, null); return closeFuture; } @Override public void close() throws IOException { - FutureUtils.result(asyncClose()); + Utils.ioResult(asyncClose()); } public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java index 63a81bd..e4a175c 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java @@ -19,10 +19,9 @@ package org.apache.distributedlog.impl.acl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.thrift.TException; import org.apache.thrift.protocol.TJSONProtocol; import org.apache.thrift.transport.TMemoryBuffer; @@ -100,8 +99,8 @@ public class ZKAccessControl { return accessControlEntry; } - public Future<ZKAccessControl> create(ZooKeeperClient zkc) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + public CompletableFuture<ZKAccessControl> create(ZooKeeperClient zkc) { + final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>(); try { zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { @@ -109,48 +108,48 @@ public class ZKAccessControl { public void processResult(int rc, String path, Object ctx, String name) { if (KeeperException.Code.OK.intValue() == rc) { ZKAccessControl.this.zkVersion = 0; - promise.setValue(ZKAccessControl.this); + promise.complete(ZKAccessControl.this); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (IOException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } - public Future<ZKAccessControl> update(ZooKeeperClient zkc) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + public CompletableFuture<ZKAccessControl> update(ZooKeeperClient zkc) { + final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>(); try { zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (KeeperException.Code.OK.intValue() == rc) { ZKAccessControl.this.zkVersion = stat.getVersion(); - promise.setValue(ZKAccessControl.this); + promise.complete(ZKAccessControl.this); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (IOException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } - public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + public static CompletableFuture<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) { + final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>(); try { zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() { @@ -159,25 +158,25 @@ public class ZKAccessControl { if (KeeperException.Code.OK.intValue() == rc) { try { AccessControlEntry ace = deserialize(zkPath, data); - promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion())); + promise.complete(new ZKAccessControl(ace, zkPath, stat.getVersion())); } catch (IOException ioe) { - promise.setException(ioe); + promise.completeExceptionally(ioe); } } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } - public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) { - final Promise<Void> promise = new Promise<Void>(); + public static CompletableFuture<Void> delete(final ZooKeeperClient zkc, final String zkPath) { + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); try { zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() { @@ -185,16 +184,16 @@ public class ZKAccessControl { public void processResult(int rc, String path, Object ctx) { if (KeeperException.Code.OK.intValue() == rc || KeeperException.Code.NONODE.intValue() == rc) { - promise.setValue(null); + promise.complete(null); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java index be8db64..3dbde6a 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java @@ -18,16 +18,15 @@ package org.apache.distributedlog.impl.acl; import com.google.common.collect.Sets; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.acl.AccessControlManager; import org.apache.distributedlog.exceptions.DLInterruptedException; import org.apache.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -76,7 +75,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { this.scheduledExecutorService = scheduledExecutorService; this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>(); try { - Await.result(fetchDefaultAccessControlEntry()); + FutureUtils.result(fetchDefaultAccessControlEntry()); } catch (Throwable t) { if (t instanceof InterruptedException) { throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t); @@ -90,7 +89,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { } try { - Await.result(fetchAccessControlEntries()); + FutureUtils.result(fetchAccessControlEntries()); } catch (Throwable t) { if (t instanceof InterruptedException) { throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t); @@ -140,19 +139,19 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { closed = true; } - private Future<Void> fetchAccessControlEntries() { - final Promise<Void> promise = new Promise<Void>(); + private CompletableFuture<Void> fetchAccessControlEntries() { + final CompletableFuture<Void> promise = new CompletableFuture<Void>(); fetchAccessControlEntries(promise); return promise; } - private void fetchAccessControlEntries(final Promise<Void> promise) { + private void fetchAccessControlEntries(final CompletableFuture<Void> promise) { try { zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { if (KeeperException.Code.OK.intValue() != rc) { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); return; } Set<String> streamsReceived = new HashSet<String>(); @@ -166,7 +165,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { } } if (streamsReceived.isEmpty()) { - promise.setValue(null); + promise.complete(null); return; } final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size()); @@ -174,7 +173,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { for (String s : streamsReceived) { final String streamName = s; ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null) - .addEventListener(new FutureEventListener<ZKAccessControl>() { + .whenComplete(new FutureEventListener<ZKAccessControl>() { @Override public void onSuccess(ZKAccessControl accessControl) { @@ -193,7 +192,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { streamEntries.remove(streamName); } else { if (1 == numFailures.incrementAndGet()) { - promise.setException(cause); + promise.completeExceptionally(cause); } } complete(); @@ -201,7 +200,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { private void complete() { if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) { - promise.setValue(null); + promise.complete(null); } } }); @@ -209,28 +208,28 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } } - private Future<ZKAccessControl> fetchDefaultAccessControlEntry() { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + private CompletableFuture<ZKAccessControl> fetchDefaultAccessControlEntry() { + final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>(); fetchDefaultAccessControlEntry(promise); return promise; } - private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) { + private void fetchDefaultAccessControlEntry(final CompletableFuture<ZKAccessControl> promise) { ZKAccessControl.read(zkc, zkRootPath, this) - .addEventListener(new FutureEventListener<ZKAccessControl>() { + .whenComplete(new FutureEventListener<ZKAccessControl>() { @Override public void onSuccess(ZKAccessControl accessControl) { logger.info("Default Access Control will be changed from {} to {}", ZKAccessControlManager.this.defaultAccessControl, accessControl); ZKAccessControlManager.this.defaultAccessControl = accessControl; - promise.setValue(accessControl); + promise.complete(accessControl); } @Override @@ -239,21 +238,21 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath); createDefaultAccessControlEntryIfNeeded(promise); } else { - promise.setException(cause); + promise.completeExceptionally(cause); } } }); } - private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) { + private void createDefaultAccessControlEntryIfNeeded(final CompletableFuture<ZKAccessControl> promise) { ZooKeeper zk; try { zk = zkc.get(); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); return; } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); return; } ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(), @@ -264,7 +263,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { logger.info("Created zk path {} for default ACL.", zkRootPath); fetchDefaultAccessControlEntry(promise); } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); } } }, null); @@ -277,7 +276,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { scheduledExecutorService.schedule(new Runnable() { @Override public void run() { - fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { + fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>() { @Override public void onSuccess(ZKAccessControl value) { // no-op @@ -305,7 +304,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { scheduledExecutorService.schedule(new Runnable() { @Override public void run() { - fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { + fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { // no-op @@ -328,10 +327,10 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher { scheduledExecutorService.schedule(new Runnable() { @Override public void run() { - fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { + fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>() { @Override public void onSuccess(ZKAccessControl value) { - fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { + fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>() { @Override public void onSuccess(Void value) { // no-op http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java index 5d7af9d..17515c3 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.callback.NamespaceListener; @@ -32,12 +33,10 @@ import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.impl.ZKNamespaceWatcher; import org.apache.distributedlog.metadata.LogMetadataStore; import org.apache.distributedlog.namespace.NamespaceWatcher; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -49,8 +48,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.net.URI; @@ -80,8 +77,9 @@ import static com.google.common.base.Charsets.UTF_8; * NOTE: current federated namespace isn't optimized for deletion/creation. so don't use it in the workloads * that have lots of creations or deletions. */ -public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable, - FutureEventListener<Set<URI>> { +public class FederatedZKLogMetadataStore + extends NamespaceWatcher + implements LogMetadataStore, Watcher, Runnable, FutureEventListener<Set<URI>> { static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class); @@ -100,7 +98,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log * @throws KeeperException */ public static void createFederatedNamespace(URI namespace, ZooKeeperClient zkc) - throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException { + throws IOException, KeeperException { String zkSubNamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES; Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT); @@ -112,7 +110,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log class SubNamespace implements NamespaceListener { final URI uri; final ZKNamespaceWatcher watcher; - Promise<Set<String>> logsFuture = new Promise<Set<String>>(); + CompletableFuture<Set<String>> logsFuture = new CompletableFuture<Set<String>>(); SubNamespace(URI uri) { this.uri = uri; @@ -124,7 +122,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log this.watcher.watchNamespaceChanges(); } - synchronized Future<Set<String>> getLogs() { + synchronized CompletableFuture<Set<String>> getLogs() { return logsFuture; } @@ -134,16 +132,16 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log Set<String> oldLogs = Sets.newHashSet(); // update the sub namespace cache - Promise<Set<String>> newLogsPromise; + CompletableFuture<Set<String>> newLogsPromise; synchronized (this) { - if (logsFuture.isDefined()) { // the promise is already satisfied + if (logsFuture.isDone()) { // the promise is already satisfied try { oldLogs = FutureUtils.result(logsFuture); - } catch (IOException e) { + } catch (Exception e) { logger.error("Unexpected exception when getting logs from a satisified future of {} : ", uri, e); } - logsFuture = new Promise<Set<String>>(); + logsFuture = new CompletableFuture<Set<String>>(); } // update the reverse cache @@ -163,7 +161,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } newLogsPromise = logsFuture; } - newLogsPromise.setValue(newLogs); + newLogsPromise.complete(newLogs); // notify namespace changes notifyOnNamespaceChanges(); @@ -203,7 +201,16 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log this.maxLogsPerSubnamespace = conf.getFederatedMaxLogsPerSubnamespace(); // fetch the sub namespace - Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this)); + Set<URI> uris; + try { + uris = FutureUtils.result(fetchSubNamespaces(this)); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } + } for (URI uri : uris) { SubNamespace subNs = new SubNamespace(uri); if (null == subNamespaces.putIfAbsent(uri, subNs)) { @@ -228,21 +235,21 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } } - private <T> Future<T> postStateCheck(Future<T> future) { - final Promise<T> postCheckedPromise = new Promise<T>(); - future.addEventListener(new FutureEventListener<T>() { + private <T> CompletableFuture<T> postStateCheck(CompletableFuture<T> future) { + final CompletableFuture<T> postCheckedPromise = new CompletableFuture<T>(); + future.whenComplete(new FutureEventListener<T>() { @Override public void onSuccess(T value) { if (duplicatedLogFound.get()) { - postCheckedPromise.setException(new UnexpectedException("Duplicate log found under " + namespace)); + postCheckedPromise.completeExceptionally(new UnexpectedException("Duplicate log found under " + namespace)); } else { - postCheckedPromise.setValue(value); + postCheckedPromise.complete(value); } } @Override public void onFailure(Throwable cause) { - postCheckedPromise.setException(cause); + postCheckedPromise.completeExceptionally(cause); } }); return postCheckedPromise; @@ -273,13 +280,13 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log namespace.getFragment()); } - Future<Set<URI>> getCachedSubNamespaces() { + CompletableFuture<Set<URI>> getCachedSubNamespaces() { Set<URI> nsSet = subNamespaces.keySet(); - return Future.value(nsSet); + return FutureUtils.value(nsSet); } - Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) { - final Promise<Set<URI>> promise = new Promise<Set<URI>>(); + CompletableFuture<Set<URI>> fetchSubNamespaces(final Watcher watcher) { + final CompletableFuture<Set<URI>> promise = new CompletableFuture<Set<URI>>(); try { zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() { @Override @@ -287,27 +294,27 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log if (Code.OK.intValue() == rc) { fetchSubNamespaces(watcher, promise); } else { - promise.setException(KeeperException.create(Code.get(rc))); + promise.completeExceptionally(KeeperException.create(Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; } private void fetchSubNamespaces(Watcher watcher, - final Promise<Set<URI>> promise) { + final CompletableFuture<Set<URI>> promise) { try { zkc.get().getChildren(this.zkSubnamespacesPath, watcher, new AsyncCallback.Children2Callback() { @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { if (Code.NONODE.intValue() == rc) { - promise.setException(new UnexpectedException( + promise.completeExceptionally(new UnexpectedException( "The subnamespaces don't exist for the federated namespace " + namespace)); } else if (Code.OK.intValue() == rc) { Set<URI> subnamespaces = Sets.newHashSet(); @@ -318,26 +325,26 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } } catch (URISyntaxException use) { logger.error("Invalid sub namespace uri found : ", use); - promise.setException(new UnexpectedException( + promise.completeExceptionally(new UnexpectedException( "Invalid sub namespace uri found in " + namespace, use)); return; } // update the sub namespaces set before update version setZkSubnamespacesVersion(stat.getVersion()); - promise.setValue(subnamespaces); + promise.complete(subnamespaces); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } } @Override public void run() { - fetchSubNamespaces(this).addEventListener(this); + fetchSubNamespaces(this).whenComplete(this); } @Override @@ -370,7 +377,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) { // fetch the namespace - fetchSubNamespaces(this).addEventListener(this); + fetchSubNamespaces(this).whenComplete(this); } } @@ -378,27 +385,27 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log // Log Related Methods // - private <A> Future<A> duplicatedLogException(String logName) { - return Future.exception(new UnexpectedException("Duplicated log " + logName + private <A> CompletableFuture<A> duplicatedLogException(String logName) { + return FutureUtils.exception(new UnexpectedException("Duplicated log " + logName + " found in namespace " + namespace)); } @Override - public Future<URI> createLog(final String logName) { + public CompletableFuture<URI> createLog(final String logName) { if (duplicatedLogFound.get()) { return duplicatedLogException(duplicatedLogName.get()); } - Promise<URI> createPromise = new Promise<URI>(); + CompletableFuture<URI> createPromise = new CompletableFuture<URI>(); doCreateLog(logName, createPromise); return postStateCheck(createPromise); } - void doCreateLog(final String logName, final Promise<URI> createPromise) { - getLogLocation(logName).addEventListener(new FutureEventListener<Optional<URI>>() { + void doCreateLog(final String logName, final CompletableFuture<URI> createPromise) { + getLogLocation(logName).whenComplete(new FutureEventListener<Optional<URI>>() { @Override public void onSuccess(Optional<URI> uriOptional) { if (uriOptional.isPresent()) { - createPromise.setException(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get())); + createPromise.completeExceptionally(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get())); } else { getCachedSubNamespacesAndCreateLog(logName, createPromise); } @@ -406,14 +413,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log @Override public void onFailure(Throwable cause) { - createPromise.setException(cause); + createPromise.completeExceptionally(cause); } }); } private void getCachedSubNamespacesAndCreateLog(final String logName, - final Promise<URI> createPromise) { - getCachedSubNamespaces().addEventListener(new FutureEventListener<Set<URI>>() { + final CompletableFuture<URI> createPromise) { + getCachedSubNamespaces().whenComplete(new FutureEventListener<Set<URI>>() { @Override public void onSuccess(Set<URI> uris) { findSubNamespaceToCreateLog(logName, uris, createPromise); @@ -421,14 +428,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log @Override public void onFailure(Throwable cause) { - createPromise.setException(cause); + createPromise.completeExceptionally(cause); } }); } private void fetchSubNamespacesAndCreateLog(final String logName, - final Promise<URI> createPromise) { - fetchSubNamespaces(null).addEventListener(new FutureEventListener<Set<URI>>() { + final CompletableFuture<URI> createPromise) { + fetchSubNamespaces(null).whenComplete(new FutureEventListener<Set<URI>>() { @Override public void onSuccess(Set<URI> uris) { findSubNamespaceToCreateLog(logName, uris, createPromise); @@ -436,26 +443,26 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log @Override public void onFailure(Throwable cause) { - createPromise.setException(cause); + createPromise.completeExceptionally(cause); } }); } private void findSubNamespaceToCreateLog(final String logName, final Set<URI> uris, - final Promise<URI> createPromise) { + final CompletableFuture<URI> createPromise) { final List<URI> uriList = Lists.newArrayListWithExpectedSize(uris.size()); - List<Future<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size()); + List<CompletableFuture<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size()); for (URI uri : uris) { SubNamespace subNs = subNamespaces.get(uri); if (null == subNs) { - createPromise.setException(new UnexpectedException("No sub namespace " + uri + " found")); + createPromise.completeExceptionally(new UnexpectedException("No sub namespace " + uri + " found")); return; } futureList.add(subNs.getLogs()); uriList.add(uri); } - Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() { + FutureUtils.collect(futureList).whenComplete(new FutureEventListener<List<Set<String>>>() { @Override public void onSuccess(List<Set<String>> resultList) { for (int i = resultList.size() - 1; i >= 0; i--) { @@ -467,7 +474,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } } // All sub namespaces are full - createSubNamespace().addEventListener(new FutureEventListener<URI>() { + createSubNamespace().whenComplete(new FutureEventListener<URI>() { @Override public void onSuccess(URI uri) { // the new namespace will be propagated to the namespace cache by the namespace listener @@ -479,14 +486,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log @Override public void onFailure(Throwable cause) { - createPromise.setException(cause); + createPromise.completeExceptionally(cause); } }); } @Override public void onFailure(Throwable cause) { - createPromise.setException(cause); + createPromise.completeExceptionally(cause); } }); } @@ -499,8 +506,8 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log return SUB_NAMESPACE_PREFIX + parts[parts.length - 1]; } - Future<URI> createSubNamespace() { - final Promise<URI> promise = new Promise<URI>(); + CompletableFuture<URI> createSubNamespace() { + final CompletableFuture<URI> promise = new CompletableFuture<URI>(); final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX; try { @@ -512,21 +519,21 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log try { URI newUri = getSubNamespaceURI(getNamespaceFromZkPath(name)); logger.info("Created sub namespace {}", newUri); - promise.setValue(newUri); + promise.complete(newUri); } catch (UnexpectedException ue) { - promise.setException(ue); + promise.completeExceptionally(ue); } catch (URISyntaxException e) { - promise.setException(new UnexpectedException("Invalid namespace " + name + " is created.")); + promise.completeExceptionally(new UnexpectedException("Invalid namespace " + name + " is created.")); } } else { - promise.setException(KeeperException.create(Code.get(rc))); + promise.completeExceptionally(KeeperException.create(Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); + promise.completeExceptionally(e); } catch (InterruptedException e) { - promise.setException(e); + promise.completeExceptionally(e); } return promise; @@ -545,22 +552,22 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log */ private void createLogInNamespace(final URI uri, final String logName, - final Promise<URI> createPromise) { + final CompletableFuture<URI> createPromise) { // TODO: rewrite this after we bump to zk 3.5, where we will have asynchronous version of multi scheduler.submit(new Runnable() { @Override public void run() { try { createLogInNamespaceSync(uri, logName); - createPromise.setValue(uri); + createPromise.complete(uri); } catch (InterruptedException e) { - createPromise.setException(e); + createPromise.completeExceptionally(e); } catch (IOException e) { - createPromise.setException(e); + createPromise.completeExceptionally(e); } catch (KeeperException.BadVersionException bve) { fetchSubNamespacesAndCreateLog(logName, createPromise); } catch (KeeperException e) { - createPromise.setException(e); + createPromise.completeExceptionally(e); } } }); @@ -617,39 +624,35 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } @Override - public Future<Optional<URI>> getLogLocation(final String logName) { + public CompletableFuture<Optional<URI>> getLogLocation(final String logName) { if (duplicatedLogFound.get()) { return duplicatedLogException(duplicatedLogName.get()); } URI location = log2Locations.get(logName); if (null != location) { - return postStateCheck(Future.value(Optional.of(location))); + return postStateCheck(FutureUtils.value(Optional.of(location))); } if (!forceCheckLogExistence) { Optional<URI> result = Optional.absent(); - return Future.value(result); + return FutureUtils.value(result); } - return postStateCheck(fetchLogLocation(logName).onSuccess( - new AbstractFunction1<Optional<URI>, BoxedUnit>() { - @Override - public BoxedUnit apply(Optional<URI> uriOptional) { - if (uriOptional.isPresent()) { - log2Locations.putIfAbsent(logName, uriOptional.get()); - } - return BoxedUnit.UNIT; - } - })); + return postStateCheck(fetchLogLocation(logName).thenApply((uriOptional) -> { + if (uriOptional.isPresent()) { + log2Locations.putIfAbsent(logName, uriOptional.get()); + } + return uriOptional; + })); } - private Future<Optional<URI>> fetchLogLocation(final String logName) { - final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>(); + private CompletableFuture<Optional<URI>> fetchLogLocation(final String logName) { + final CompletableFuture<Optional<URI>> fetchPromise = new CompletableFuture<Optional<URI>>(); Set<URI> uris = subNamespaces.keySet(); - List<Future<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size()); + List<CompletableFuture<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size()); for (URI uri : uris) { fetchFutures.add(fetchLogLocation(uri, logName)); } - Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() { + FutureUtils.collect(fetchFutures).whenComplete(new FutureEventListener<List<Optional<URI>>>() { @Override public void onSuccess(List<Optional<URI>> fetchResults) { Optional<URI> result = Optional.absent(); @@ -660,7 +663,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log new Object[] { logName, result.get(), fetchResult.get() }); duplicatedLogName.compareAndSet(null, logName); duplicatedLogFound.set(true); - fetchPromise.setException(new UnexpectedException("Log " + logName + fetchPromise.completeExceptionally(new UnexpectedException("Log " + logName + " is found in multiple sub namespaces : " + result.get() + " & " + fetchResult.get())); return; @@ -669,62 +672,57 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log result = fetchResult; } } - fetchPromise.setValue(result); + fetchPromise.complete(result); } @Override public void onFailure(Throwable cause) { - fetchPromise.setException(cause); + fetchPromise.completeExceptionally(cause); } }); return fetchPromise; } - private Future<Optional<URI>> fetchLogLocation(final URI uri, String logName) { - final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>(); + private CompletableFuture<Optional<URI>> fetchLogLocation(final URI uri, String logName) { + final CompletableFuture<Optional<URI>> fetchPromise = new CompletableFuture<Optional<URI>>(); final String logRootPath = uri.getPath() + "/" + logName; try { zkc.get().exists(logRootPath, false, new AsyncCallback.StatCallback() { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (Code.OK.intValue() == rc) { - fetchPromise.setValue(Optional.of(uri)); + fetchPromise.complete(Optional.of(uri)); } else if (Code.NONODE.intValue() == rc) { - fetchPromise.setValue(Optional.<URI>absent()); + fetchPromise.complete(Optional.<URI>absent()); } else { - fetchPromise.setException(KeeperException.create(Code.get(rc))); + fetchPromise.completeExceptionally(KeeperException.create(Code.get(rc))); } } }, null); } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - fetchPromise.setException(e); + fetchPromise.completeExceptionally(e); } catch (InterruptedException e) { - fetchPromise.setException(e); + fetchPromise.completeExceptionally(e); } return fetchPromise; } @Override - public Future<Iterator<String>> getLogs() { + public CompletableFuture<Iterator<String>> getLogs() { if (duplicatedLogFound.get()) { return duplicatedLogException(duplicatedLogName.get()); } - return postStateCheck(retrieveLogs().map( - new AbstractFunction1<List<Set<String>>, Iterator<String>>() { - @Override - public Iterator<String> apply(List<Set<String>> resultList) { - return getIterator(resultList); - } - })); + return postStateCheck(retrieveLogs().thenApply( + resultList -> getIterator(resultList))); } - private Future<List<Set<String>>> retrieveLogs() { + private CompletableFuture<List<Set<String>>> retrieveLogs() { Collection<SubNamespace> subNss = subNamespaces.values(); - List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size()); + List<CompletableFuture<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size()); for (SubNamespace subNs : subNss) { logsList.add(subNs.getLogs()); } - return Future.collect(logsList); + return FutureUtils.collect(logsList); } private Iterator<String> getIterator(List<Set<String>> resultList) { @@ -747,13 +745,9 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log } private void notifyOnNamespaceChanges() { - retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, BoxedUnit>() { - @Override - public BoxedUnit apply(List<Set<String>> resultList) { - for (NamespaceListener listener : listeners) { - listener.onStreamsChanged(getIterator(resultList)); - } - return BoxedUnit.UNIT; + retrieveLogs().thenAccept(resultList -> { + for (NamespaceListener listener : listeners) { + listener.onStreamsChanged(getIterator(resultList)); } }); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java index 8f9913e..e45c755 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java @@ -17,25 +17,23 @@ */ package org.apache.distributedlog.impl.logsegment; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.apache.distributedlog.bk.LedgerAllocator; import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; import org.apache.distributedlog.util.Allocator; import org.apache.distributedlog.util.Transaction; -import com.twitter.util.Future; import org.apache.bookkeeper.client.LedgerHandle; -import scala.Function1; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; /** * Allocate log segments */ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> { - private static class NewLogSegmentEntryWriterFn extends AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> { + private static class NewLogSegmentEntryWriterFn implements Function<LedgerHandle, LogSegmentEntryWriter> { - static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE = + static final Function<LedgerHandle, LogSegmentEntryWriter> INSTANCE = new NewLogSegmentEntryWriterFn(); private NewLogSegmentEntryWriterFn() {} @@ -58,8 +56,8 @@ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> } @Override - public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn, - final Transaction.OpListener<LogSegmentEntryWriter> listener) { + public CompletableFuture<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn, + final Transaction.OpListener<LogSegmentEntryWriter> listener) { return allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() { @Override public void onCommit(LedgerHandle lh) { @@ -70,16 +68,16 @@ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> public void onAbort(Throwable t) { listener.onAbort(t); } - }).map(NewLogSegmentEntryWriterFn.INSTANCE); + }).thenApply(NewLogSegmentEntryWriterFn.INSTANCE); } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { return allocator.asyncClose(); } @Override - public Future<Void> delete() { + public CompletableFuture<Void> delete() { return allocator.delete(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java index 034b23e..0bb91ae 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java @@ -19,6 +19,7 @@ package org.apache.distributedlog.impl.logsegment; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.Entry; import org.apache.distributedlog.LogSegmentMetadata; @@ -29,10 +30,8 @@ import org.apache.distributedlog.exceptions.EndOfLogSegmentException; import org.apache.distributedlog.exceptions.ReadCancelledException; import org.apache.distributedlog.injector.AsyncFailureInjector; import org.apache.distributedlog.logsegment.LogSegmentEntryReader; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -87,7 +86,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, return done; } - void setValue(LedgerEntry entry) { + void complete(LedgerEntry entry) { synchronized (this) { if (done) { return; @@ -98,7 +97,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, setDone(true); } - void setException(int rc) { + void completeExceptionally(int rc) { synchronized (this) { if (done) { return; @@ -152,16 +151,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, while (entries.hasMoreElements()) { // more entries are returned if (null != entry) { - setException(BKException.Code.UnexpectedConditionException); + completeExceptionally(BKException.Code.UnexpectedConditionException); return; } entry = entries.nextElement(); } if (null == entry || entry.getEntryId() != entryId) { - setException(BKException.Code.UnexpectedConditionException); + completeExceptionally(BKException.Code.UnexpectedConditionException); return; } - setValue(entry); + complete(entry); } @Override @@ -186,7 +185,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, return; } if (null != entry && this.entryId == entryId) { - setValue(entry); + complete(entry); return; } // the long poll is timeout or interrupted; we will retry it again. @@ -215,7 +214,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, nextReadBackoffTime, TimeUnit.MILLISECONDS); } else { - setException(rc); + completeExceptionally(rc); } return false; } @@ -229,7 +228,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, private class PendingReadRequest { private final int numEntries; private final List<Entry.Reader> entries; - private final Promise<List<Entry.Reader>> promise; + private final CompletableFuture<List<Entry.Reader>> promise; PendingReadRequest(int numEntries) { this.numEntries = numEntries; @@ -238,15 +237,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } else { this.entries = new ArrayList<Entry.Reader>(); } - this.promise = new Promise<List<Entry.Reader>>(); + this.promise = new CompletableFuture<List<Entry.Reader>>(); } - Promise<List<Entry.Reader>> getPromise() { + CompletableFuture<List<Entry.Reader>> getPromise() { return promise; } - void setException(Throwable throwable) { - FutureUtils.setException(promise, throwable); + void completeExceptionally(Throwable throwable) { + FutureUtils.completeExceptionally(promise, throwable); } void addEntry(Entry.Reader entry) { @@ -254,7 +253,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } void complete() { - FutureUtils.setValue(promise, entries); + FutureUtils.complete(promise, entries); onEntriesConsumed(entries.size()); } @@ -277,7 +276,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, private final int numPrefetchEntries; private final int maxPrefetchEntries; // state - private Promise<Void> closePromise = null; + private CompletableFuture<Void> closePromise = null; private LogSegmentMetadata metadata; private LedgerHandle lh; private final List<LedgerHandle> openLedgerHandles; @@ -457,7 +456,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, if (isBeyondLastAddConfirmed()) { // if the reader is already caught up, let's fail the reader immediately // as we need to pull the latest metadata of this log segment. - setException(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc), + completeExceptionally(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc), true); return; } @@ -488,7 +487,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, * @param throwable exception indicating the error * @param isBackground is the reader set exception by background reads or foreground reads */ - private void setException(Throwable throwable, boolean isBackground) { + private void completeExceptionally(Throwable throwable, boolean isBackground) { lastException.compareAndSet(null, throwable); if (isBackground) { notifyReaders(); @@ -510,7 +509,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, readQueue.clear(); } for (PendingReadRequest request : requestsToCancel) { - request.setException(throwExc); + request.completeExceptionally(throwExc); } } @@ -630,11 +629,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } @Override - public Future<List<Entry.Reader>> readNext(int numEntries) { + public CompletableFuture<List<Entry.Reader>> readNext(int numEntries) { final PendingReadRequest readRequest = new PendingReadRequest(numEntries); if (checkClosedOrInError()) { - readRequest.setException(lastException.get()); + readRequest.completeExceptionally(lastException.get()); } else { boolean wasQueueEmpty; synchronized (readQueue) { @@ -682,9 +681,9 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, // mark the reader in error and abort all pending reads since // we don't know the last consumed read if (null == lastException.get()) { - if (nextRequest.getPromise().isInterrupted().isDefined()) { - setException(new DLInterruptedException("Interrupted on reading log segment " - + getSegment() + " : " + nextRequest.getPromise().isInterrupted().get()), false); + if (nextRequest.getPromise().isCancelled()) { + completeExceptionally(new DLInterruptedException("Interrupted on reading log segment " + + getSegment() + " : " + nextRequest.getPromise().isCancelled()), false); } } @@ -707,11 +706,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } else { DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from " + getSegment()); - nextRequest.setException(ise); + nextRequest.completeExceptionally(ise); if (null != request) { - request.setException(ise); + request.completeExceptionally(ise); } - setException(ise, false); + completeExceptionally(ise, false); } } else { if (0 == scheduleCountLocal) { @@ -732,7 +731,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } // reach end of log segment if (hitEndOfLogSegment) { - setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); + completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false); return; } if (null == entry) { @@ -742,7 +741,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, if (!entry.isDone()) { // we already reached end of the log segment if (isEndOfLogSegment(entry.getEntryId())) { - setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); + completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false); } return; } @@ -751,13 +750,13 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, if (entry != removedEntry) { DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from " + getSegment()); - setException(ise, false); + completeExceptionally(ise, false); return; } try { nextRequest.addEntry(processReadEntry(entry.getEntry())); } catch (IOException e) { - setException(e, false); + completeExceptionally(e, false); return; } } else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) { @@ -766,7 +765,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, readAheadEntries.poll(); continue; } else { - setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId() + completeExceptionally(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId() + " @ log segment " + getSegment(), entry.getRc()), false); return; } @@ -812,26 +811,29 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, } @Override - public Future<Void> asyncClose() { - final Promise<Void> closeFuture; + public CompletableFuture<Void> asyncClose() { + final CompletableFuture<Void> closeFuture; ReadCancelledException exception; LedgerHandle[] lhsToClose; synchronized (this) { if (null != closePromise) { return closePromise; } - closeFuture = closePromise = new Promise<Void>(); + closeFuture = closePromise = new CompletableFuture<Void>(); lhsToClose = openLedgerHandles.toArray(new LedgerHandle[openLedgerHandles.size()]); // set the exception to cancel pending and subsequent reads exception = new ReadCancelledException(getSegment().getZNodeName(), "Reader was closed"); - setException(exception, false); + completeExceptionally(exception, false); } // cancel all pending reads cancelAllPendingReads(exception); // close all the open ledger - BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture); + FutureUtils.proxyTo( + BKUtils.closeLedgers(lhsToClose), + closeFuture + ); return closeFuture; } }