http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java deleted file mode 100644 index 0c90a50..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java +++ /dev/null @@ -1,374 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.acl; - -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.thrift.AccessControlEntry; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.util.ZkUtils; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager} - */ -public class ZKAccessControlManager implements AccessControlManager, Watcher { - - private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class); - - private static final int ZK_RETRY_BACKOFF_MS = 500; - - protected final DistributedLogConfiguration conf; - protected final ZooKeeperClient zkc; - protected final String zkRootPath; - protected final ScheduledExecutorService scheduledExecutorService; - - protected final ConcurrentMap<String, ZKAccessControl> streamEntries; - protected ZKAccessControl defaultAccessControl; - protected volatile boolean closed = false; - - public ZKAccessControlManager(DistributedLogConfiguration conf, - ZooKeeperClient zkc, - String zkRootPath, - ScheduledExecutorService scheduledExecutorService) throws IOException { - this.conf = conf; - this.zkc = zkc; - this.zkRootPath = zkRootPath; - this.scheduledExecutorService = scheduledExecutorService; - this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>(); - try { - Await.result(fetchDefaultAccessControlEntry()); - } catch (Throwable t) { - if (t instanceof InterruptedException) { - throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t); - } else if (t instanceof KeeperException) { - throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t); - } else if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t); - } - } - - try { - Await.result(fetchAccessControlEntries()); - } catch (Throwable t) { - if (t instanceof InterruptedException) { - throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t); - } else if (t instanceof KeeperException) { - throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t); - } else if (t instanceof IOException) { - throw (IOException) t; - } else { - throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t); - } - } - } - - protected AccessControlEntry getAccessControlEntry(String stream) { - ZKAccessControl entry = streamEntries.get(stream); - entry = null == entry ? defaultAccessControl : entry; - return entry.getAccessControlEntry(); - } - - @Override - public boolean allowWrite(String stream) { - return !getAccessControlEntry(stream).isDenyWrite(); - } - - @Override - public boolean allowTruncate(String stream) { - return !getAccessControlEntry(stream).isDenyTruncate(); - } - - @Override - public boolean allowDelete(String stream) { - return !getAccessControlEntry(stream).isDenyDelete(); - } - - @Override - public boolean allowAcquire(String stream) { - return !getAccessControlEntry(stream).isDenyAcquire(); - } - - @Override - public boolean allowRelease(String stream) { - return !getAccessControlEntry(stream).isDenyRelease(); - } - - @Override - public void close() { - closed = true; - } - - private Future<Void> fetchAccessControlEntries() { - final Promise<Void> promise = new Promise<Void>(); - fetchAccessControlEntries(promise); - return promise; - } - - private void fetchAccessControlEntries(final Promise<Void> promise) { - try { - zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (KeeperException.Code.OK.intValue() != rc) { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - return; - } - Set<String> streamsReceived = new HashSet<String>(); - streamsReceived.addAll(children); - Set<String> streamsCached = streamEntries.keySet(); - Set<String> streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy(); - for (String s : streamsRemoved) { - ZKAccessControl accessControl = streamEntries.remove(s); - if (null != accessControl) { - logger.info("Removed Access Control Entry for stream {} : {}", s, accessControl.getAccessControlEntry()); - } - } - if (streamsReceived.isEmpty()) { - promise.setValue(null); - return; - } - final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size()); - final AtomicInteger numFailures = new AtomicInteger(0); - for (String s : streamsReceived) { - final String streamName = s; - ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null) - .addEventListener(new FutureEventListener<ZKAccessControl>() { - - @Override - public void onSuccess(ZKAccessControl accessControl) { - streamEntries.put(streamName, accessControl); - logger.info("Added overrided access control for stream {} : {}", streamName, accessControl.getAccessControlEntry()); - complete(); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof KeeperException.NoNodeException) { - streamEntries.remove(streamName); - } else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) { - logger.warn("Access control is corrupted for stream {} @ {}, skipped it ...", - new Object[] { streamName, zkRootPath, cause }); - streamEntries.remove(streamName); - } else { - if (1 == numFailures.incrementAndGet()) { - promise.setException(cause); - } - } - complete(); - } - - private void complete() { - if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) { - promise.setValue(null); - } - } - }); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - } - - private Future<ZKAccessControl> fetchDefaultAccessControlEntry() { - final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); - fetchDefaultAccessControlEntry(promise); - return promise; - } - - private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) { - ZKAccessControl.read(zkc, zkRootPath, this) - .addEventListener(new FutureEventListener<ZKAccessControl>() { - @Override - public void onSuccess(ZKAccessControl accessControl) { - logger.info("Default Access Control will be changed from {} to {}", - ZKAccessControlManager.this.defaultAccessControl, - accessControl); - ZKAccessControlManager.this.defaultAccessControl = accessControl; - promise.setValue(accessControl); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof KeeperException.NoNodeException) { - logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath); - createDefaultAccessControlEntryIfNeeded(promise); - } else { - promise.setException(cause); - } - } - }); - } - - private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) { - ZooKeeper zk; - try { - zk = zkc.get(); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - return; - } catch (InterruptedException e) { - promise.setException(e); - return; - } - ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(), - CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (KeeperException.Code.OK.intValue() == rc) { - logger.info("Created zk path {} for default ACL.", zkRootPath); - fetchDefaultAccessControlEntry(promise); - } else { - promise.setException(KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } - - private void refetchDefaultAccessControlEntry(final int delayMs) { - if (closed) { - return; - } - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { - @Override - public void onSuccess(ZKAccessControl value) { - // no-op - } - @Override - public void onFailure(Throwable cause) { - if (cause instanceof ZKAccessControl.CorruptedAccessControlException) { - logger.warn("Default access control entry is corrupted, ignore this update : ", cause); - return; - } - - logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS); - } - }); - } - }, delayMs, TimeUnit.MILLISECONDS); - } - - private void refetchAccessControlEntries(final int delayMs) { - if (closed) { - return; - } - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - // no-op - } - @Override - public void onFailure(Throwable cause) { - logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS); - } - }); - } - }, delayMs, TimeUnit.MILLISECONDS); - } - - private void refetchAllAccessControlEntries(final int delayMs) { - if (closed) { - return; - } - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { - @Override - public void onSuccess(ZKAccessControl value) { - fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - // no-op - } - - @Override - public void onFailure(Throwable cause) { - logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ", - ZK_RETRY_BACKOFF_MS, cause); - refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS); - } - }); - } - }, delayMs, TimeUnit.MILLISECONDS); - } - - @Override - public void process(WatchedEvent event) { - if (Event.EventType.None.equals(event.getType())) { - if (event.getState() == Event.KeeperState.Expired) { - refetchAllAccessControlEntries(0); - } - } else if (Event.EventType.NodeDataChanged.equals(event.getType())) { - logger.info("Default ACL for {} is changed, refetching ...", zkRootPath); - refetchDefaultAccessControlEntry(0); - } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) { - logger.info("List of ACLs for {} are changed, refetching ...", zkRootPath); - refetchAccessControlEntries(0); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java deleted file mode 100644 index 0a8f28b..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/federated/FederatedZKLogMetadataStore.java +++ /dev/null @@ -1,760 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.federated; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.exceptions.LogExistsException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.ZKNamespaceWatcher; -import com.twitter.distributedlog.metadata.LogMetadataStore; -import com.twitter.distributedlog.namespace.NamespaceWatcher; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.Transaction; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * A Federated ZooKeeper Based Log Metadata Store. - * - * To Upgrade a simple ZKLogMetadataStore to FederatedZKLogMetadataStore, following steps should be taken in sequence: - * a) deploy the new code with disabling createStreamsIfNotExists in all writer. - * b) once all proxies disable the flag, update namespace binding to enable federated namespace. - * c) restart writers to take federated namespace in place. - * - * NOTE: current federated namespace isn't optimized for deletion/creation. so don't use it in the workloads - * that have lots of creations or deletions. - */ -public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable, - FutureEventListener<Set<URI>> { - - static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class); - - private final static String ZNODE_SUB_NAMESPACES = ".subnamespaces"; - private final static String SUB_NAMESPACE_PREFIX = "NS_"; - - /** - * Create the federated namespace. - * - * @param namespace - * namespace to create - * @param zkc - * zookeeper client - * @throws InterruptedException - * @throws ZooKeeperClient.ZooKeeperConnectionException - * @throws KeeperException - */ - public static void createFederatedNamespace(URI namespace, ZooKeeperClient zkc) - throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException { - String zkSubNamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES; - Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0], - zkc.getDefaultACL(), CreateMode.PERSISTENT); - } - - /** - * Represent a sub namespace inside the federated namespace. - */ - class SubNamespace implements NamespaceListener { - final URI uri; - final ZKNamespaceWatcher watcher; - Promise<Set<String>> logsFuture = new Promise<Set<String>>(); - - SubNamespace(URI uri) { - this.uri = uri; - this.watcher = new ZKNamespaceWatcher(conf, uri, zkc, scheduler); - this.watcher.registerListener(this); - } - - void watch() { - this.watcher.watchNamespaceChanges(); - } - - synchronized Future<Set<String>> getLogs() { - return logsFuture; - } - - @Override - public void onStreamsChanged(Iterator<String> newLogsIter) { - Set<String> newLogs = Sets.newHashSet(newLogsIter); - Set<String> oldLogs = Sets.newHashSet(); - - // update the sub namespace cache - Promise<Set<String>> newLogsPromise; - synchronized (this) { - if (logsFuture.isDefined()) { // the promise is already satisfied - try { - oldLogs = FutureUtils.result(logsFuture); - } catch (IOException e) { - logger.error("Unexpected exception when getting logs from a satisified future of {} : ", - uri, e); - } - logsFuture = new Promise<Set<String>>(); - } - - // update the reverse cache - for (String logName : newLogs) { - URI oldURI = log2Locations.putIfAbsent(logName, uri); - if (null != oldURI && !Objects.equal(uri, oldURI)) { - logger.error("Log {} is found duplicated in multiple locations : old location = {}," + - " new location = {}", new Object[] { logName, oldURI, uri }); - duplicatedLogFound.set(true); - } - } - - // remove the gone streams - Set<String> deletedLogs = Sets.difference(oldLogs, newLogs); - for (String logName : deletedLogs) { - log2Locations.remove(logName, uri); - } - newLogsPromise = logsFuture; - } - newLogsPromise.setValue(newLogs); - - // notify namespace changes - notifyOnNamespaceChanges(); - } - } - - final DistributedLogConfiguration conf; - final URI namespace; - final ZooKeeperClient zkc; - final OrderedScheduler scheduler; - final String zkSubnamespacesPath; - final AtomicBoolean duplicatedLogFound = new AtomicBoolean(false); - final AtomicReference<String> duplicatedLogName = new AtomicReference<String>(null); - final AtomicReference<Integer> zkSubnamespacesVersion = new AtomicReference<Integer>(null); - - final int maxLogsPerSubnamespace; - // sub namespaces - final ConcurrentSkipListMap<URI, SubNamespace> subNamespaces; - // map between log name and its location - final ConcurrentMap<String, URI> log2Locations; - // final - final boolean forceCheckLogExistence; - - public FederatedZKLogMetadataStore( - DistributedLogConfiguration conf, - URI namespace, - ZooKeeperClient zkc, - OrderedScheduler scheduler) throws IOException { - this.conf = conf; - this.namespace = namespace; - this.zkc = zkc; - this.scheduler = scheduler; - this.forceCheckLogExistence = conf.getFederatedCheckExistenceWhenCacheMiss(); - this.subNamespaces = new ConcurrentSkipListMap<URI, SubNamespace>(); - this.log2Locations = new ConcurrentHashMap<String, URI>(); - this.zkSubnamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES; - this.maxLogsPerSubnamespace = conf.getFederatedMaxLogsPerSubnamespace(); - - // fetch the sub namespace - Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this)); - for (URI uri : uris) { - SubNamespace subNs = new SubNamespace(uri); - if (null == subNamespaces.putIfAbsent(uri, subNs)) { - subNs.watch(); - logger.info("Watched sub namespace {}", uri); - } - } - - logger.info("Federated ZK LogMetadataStore is initialized for {}", namespace); - } - - private void scheduleTask(Runnable r, long ms) { - if (duplicatedLogFound.get()) { - logger.error("Scheduler is halted for federated namespace {} as duplicated log found", - namespace); - return; - } - try { - scheduler.schedule(r, ms, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ree) { - logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{r, ms, ree}); - } - } - - private <T> Future<T> postStateCheck(Future<T> future) { - final Promise<T> postCheckedPromise = new Promise<T>(); - future.addEventListener(new FutureEventListener<T>() { - @Override - public void onSuccess(T value) { - if (duplicatedLogFound.get()) { - postCheckedPromise.setException(new UnexpectedException("Duplicate log found under " + namespace)); - } else { - postCheckedPromise.setValue(value); - } - } - - @Override - public void onFailure(Throwable cause) { - postCheckedPromise.setException(cause); - } - }); - return postCheckedPromise; - } - - // - // SubNamespace Related Methods - // - - @VisibleForTesting - Set<URI> getSubnamespaces() { - return subNamespaces.keySet(); - } - - @VisibleForTesting - void removeLogFromCache(String logName) { - log2Locations.remove(logName); - } - - private URI getSubNamespaceURI(String ns) throws URISyntaxException { - return new URI( - namespace.getScheme(), - namespace.getUserInfo(), - namespace.getHost(), - namespace.getPort(), - namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + ns, - namespace.getQuery(), - namespace.getFragment()); - } - - Future<Set<URI>> getCachedSubNamespaces() { - Set<URI> nsSet = subNamespaces.keySet(); - return Future.value(nsSet); - } - - Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) { - final Promise<Set<URI>> promise = new Promise<Set<URI>>(); - try { - zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (Code.OK.intValue() == rc) { - fetchSubNamespaces(watcher, promise); - } else { - promise.setException(KeeperException.create(Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - return promise; - } - - private void fetchSubNamespaces(Watcher watcher, - final Promise<Set<URI>> promise) { - try { - zkc.get().getChildren(this.zkSubnamespacesPath, watcher, - new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (Code.NONODE.intValue() == rc) { - promise.setException(new UnexpectedException( - "The subnamespaces don't exist for the federated namespace " + namespace)); - } else if (Code.OK.intValue() == rc) { - Set<URI> subnamespaces = Sets.newHashSet(); - subnamespaces.add(namespace); - try { - for (String ns : children) { - subnamespaces.add(getSubNamespaceURI(ns)); - } - } catch (URISyntaxException use) { - logger.error("Invalid sub namespace uri found : ", use); - promise.setException(new UnexpectedException( - "Invalid sub namespace uri found in " + namespace, use)); - return; - } - // update the sub namespaces set before update version - setZkSubnamespacesVersion(stat.getVersion()); - promise.setValue(subnamespaces); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - } - - @Override - public void run() { - fetchSubNamespaces(this).addEventListener(this); - } - - @Override - public void onSuccess(Set<URI> uris) { - for (URI uri : uris) { - if (subNamespaces.containsKey(uri)) { - continue; - } - SubNamespace subNs = new SubNamespace(uri); - if (null == subNamespaces.putIfAbsent(uri, subNs)) { - subNs.watch(); - logger.info("Watched new sub namespace {}.", uri); - notifyOnNamespaceChanges(); - } - } - } - - @Override - public void onFailure(Throwable cause) { - // failed to fetch namespaces, retry later - scheduleTask(this, conf.getZKSessionTimeoutMilliseconds()); - } - - @Override - public void process(WatchedEvent watchedEvent) { - if (Event.EventType.None == watchedEvent.getType() && - Event.KeeperState.Expired == watchedEvent.getState()) { - scheduleTask(this, conf.getZKSessionTimeoutMilliseconds()); - return; - } - if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) { - // fetch the namespace - fetchSubNamespaces(this).addEventListener(this); - } - } - - // - // Log Related Methods - // - - private <A> Future<A> duplicatedLogException(String logName) { - return Future.exception(new UnexpectedException("Duplicated log " + logName - + " found in namespace " + namespace)); - } - - @Override - public Future<URI> createLog(final String logName) { - if (duplicatedLogFound.get()) { - return duplicatedLogException(duplicatedLogName.get()); - } - Promise<URI> createPromise = new Promise<URI>(); - doCreateLog(logName, createPromise); - return postStateCheck(createPromise); - } - - void doCreateLog(final String logName, final Promise<URI> createPromise) { - getLogLocation(logName).addEventListener(new FutureEventListener<Optional<URI>>() { - @Override - public void onSuccess(Optional<URI> uriOptional) { - if (uriOptional.isPresent()) { - createPromise.setException(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get())); - } else { - getCachedSubNamespacesAndCreateLog(logName, createPromise); - } - } - - @Override - public void onFailure(Throwable cause) { - createPromise.setException(cause); - } - }); - } - - private void getCachedSubNamespacesAndCreateLog(final String logName, - final Promise<URI> createPromise) { - getCachedSubNamespaces().addEventListener(new FutureEventListener<Set<URI>>() { - @Override - public void onSuccess(Set<URI> uris) { - findSubNamespaceToCreateLog(logName, uris, createPromise); - } - - @Override - public void onFailure(Throwable cause) { - createPromise.setException(cause); - } - }); - } - - private void fetchSubNamespacesAndCreateLog(final String logName, - final Promise<URI> createPromise) { - fetchSubNamespaces(null).addEventListener(new FutureEventListener<Set<URI>>() { - @Override - public void onSuccess(Set<URI> uris) { - findSubNamespaceToCreateLog(logName, uris, createPromise); - } - - @Override - public void onFailure(Throwable cause) { - createPromise.setException(cause); - } - }); - } - - private void findSubNamespaceToCreateLog(final String logName, - final Set<URI> uris, - final Promise<URI> createPromise) { - final List<URI> uriList = Lists.newArrayListWithExpectedSize(uris.size()); - List<Future<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size()); - for (URI uri : uris) { - SubNamespace subNs = subNamespaces.get(uri); - if (null == subNs) { - createPromise.setException(new UnexpectedException("No sub namespace " + uri + " found")); - return; - } - futureList.add(subNs.getLogs()); - uriList.add(uri); - } - Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() { - @Override - public void onSuccess(List<Set<String>> resultList) { - for (int i = resultList.size() - 1; i >= 0; i--) { - Set<String> logs = resultList.get(i); - if (logs.size() < maxLogsPerSubnamespace) { - URI uri = uriList.get(i); - createLogInNamespace(uri, logName, createPromise); - return; - } - } - // All sub namespaces are full - createSubNamespace().addEventListener(new FutureEventListener<URI>() { - @Override - public void onSuccess(URI uri) { - // the new namespace will be propagated to the namespace cache by the namespace listener - // so we don't need to cache it here. we could go ahead to create the stream under this - // namespace, as we are using sequential znode. we are mostly the first guy who create - // the log under this namespace. - createLogInNamespace(uri, logName, createPromise); - } - - @Override - public void onFailure(Throwable cause) { - createPromise.setException(cause); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - createPromise.setException(cause); - } - }); - } - - private String getNamespaceFromZkPath(String zkPath) throws UnexpectedException { - String parts[] = zkPath.split(SUB_NAMESPACE_PREFIX); - if (parts.length <= 0) { - throw new UnexpectedException("Invalid namespace @ " + zkPath); - } - return SUB_NAMESPACE_PREFIX + parts[parts.length - 1]; - } - - Future<URI> createSubNamespace() { - final Promise<URI> promise = new Promise<URI>(); - - final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX; - try { - zkc.get().create(nsPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL, - new AsyncCallback.StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (Code.OK.intValue() == rc) { - try { - URI newUri = getSubNamespaceURI(getNamespaceFromZkPath(name)); - logger.info("Created sub namespace {}", newUri); - promise.setValue(newUri); - } catch (UnexpectedException ue) { - promise.setException(ue); - } catch (URISyntaxException e) { - promise.setException(new UnexpectedException("Invalid namespace " + name + " is created.")); - } - } else { - promise.setException(KeeperException.create(Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } catch (InterruptedException e) { - promise.setException(e); - } - - return promise; - } - - /** - * Create a log under the namespace. To guarantee there is only one creation happens at time - * in a federated namespace, we use CAS operation in zookeeper. - * - * @param uri - * namespace - * @param logName - * name of the log - * @param createPromise - * the promise representing the creation result. - */ - private void createLogInNamespace(final URI uri, - final String logName, - final Promise<URI> createPromise) { - // TODO: rewrite this after we bump to zk 3.5, where we will have asynchronous version of multi - scheduler.submit(new Runnable() { - @Override - public void run() { - try { - createLogInNamespaceSync(uri, logName); - createPromise.setValue(uri); - } catch (InterruptedException e) { - createPromise.setException(e); - } catch (IOException e) { - createPromise.setException(e); - } catch (KeeperException.BadVersionException bve) { - fetchSubNamespacesAndCreateLog(logName, createPromise); - } catch (KeeperException e) { - createPromise.setException(e); - } - } - }); - } - - void createLogInNamespaceSync(URI uri, String logName) - throws InterruptedException, IOException, KeeperException { - Transaction txn = zkc.get().transaction(); - // we don't have the zk version yet. set it to 0 instead of -1, to prevent non CAS operation. - int zkVersion = null == zkSubnamespacesVersion.get() ? 0 : zkSubnamespacesVersion.get(); - txn.setData(zkSubnamespacesPath, uri.getPath().getBytes(UTF_8), zkVersion); - String logPath = uri.getPath() + "/" + logName; - txn.create(logPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT); - try { - txn.commit(); - // if the transaction succeed, the zk version is advanced - setZkSubnamespacesVersion(zkVersion + 1); - } catch (KeeperException ke) { - List<OpResult> opResults = ke.getResults(); - OpResult createResult = opResults.get(1); - if (createResult instanceof OpResult.ErrorResult) { - OpResult.ErrorResult errorResult = (OpResult.ErrorResult) createResult; - if (Code.NODEEXISTS.intValue() == errorResult.getErr()) { - throw new LogExistsException("Log " + logName + " already exists"); - } - } - OpResult setResult = opResults.get(0); - if (setResult instanceof OpResult.ErrorResult) { - OpResult.ErrorResult errorResult = (OpResult.ErrorResult) setResult; - if (Code.BADVERSION.intValue() == errorResult.getErr()) { - throw KeeperException.create(Code.BADVERSION); - } - } - throw new ZKException("ZK exception in creating log " + logName + " in " + uri, ke); - } - } - - void setZkSubnamespacesVersion(int zkVersion) { - Integer oldVersion; - boolean done = false; - while (!done) { - oldVersion = zkSubnamespacesVersion.get(); - if (null == oldVersion) { - done = zkSubnamespacesVersion.compareAndSet(null, zkVersion); - continue; - } - if (oldVersion < zkVersion) { - done = zkSubnamespacesVersion.compareAndSet(oldVersion, zkVersion); - continue; - } else { - done = true; - } - } - } - - @Override - public Future<Optional<URI>> getLogLocation(final String logName) { - if (duplicatedLogFound.get()) { - return duplicatedLogException(duplicatedLogName.get()); - } - URI location = log2Locations.get(logName); - if (null != location) { - return postStateCheck(Future.value(Optional.of(location))); - } - if (!forceCheckLogExistence) { - Optional<URI> result = Optional.absent(); - return Future.value(result); - } - return postStateCheck(fetchLogLocation(logName).onSuccess( - new AbstractFunction1<Optional<URI>, BoxedUnit>() { - @Override - public BoxedUnit apply(Optional<URI> uriOptional) { - if (uriOptional.isPresent()) { - log2Locations.putIfAbsent(logName, uriOptional.get()); - } - return BoxedUnit.UNIT; - } - })); - } - - private Future<Optional<URI>> fetchLogLocation(final String logName) { - final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>(); - - Set<URI> uris = subNamespaces.keySet(); - List<Future<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size()); - for (URI uri : uris) { - fetchFutures.add(fetchLogLocation(uri, logName)); - } - Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() { - @Override - public void onSuccess(List<Optional<URI>> fetchResults) { - Optional<URI> result = Optional.absent(); - for (Optional<URI> fetchResult : fetchResults) { - if (result.isPresent()) { - if (fetchResult.isPresent()) { - logger.error("Log {} is found in multiple sub namespaces : {} & {}.", - new Object[] { logName, result.get(), fetchResult.get() }); - duplicatedLogName.compareAndSet(null, logName); - duplicatedLogFound.set(true); - fetchPromise.setException(new UnexpectedException("Log " + logName - + " is found in multiple sub namespaces : " - + result.get() + " & " + fetchResult.get())); - return; - } - } else { - result = fetchResult; - } - } - fetchPromise.setValue(result); - } - - @Override - public void onFailure(Throwable cause) { - fetchPromise.setException(cause); - } - }); - return fetchPromise; - } - - private Future<Optional<URI>> fetchLogLocation(final URI uri, String logName) { - final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>(); - final String logRootPath = uri.getPath() + "/" + logName; - try { - zkc.get().exists(logRootPath, false, new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (Code.OK.intValue() == rc) { - fetchPromise.setValue(Optional.of(uri)); - } else if (Code.NONODE.intValue() == rc) { - fetchPromise.setValue(Optional.<URI>absent()); - } else { - fetchPromise.setException(KeeperException.create(Code.get(rc))); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - fetchPromise.setException(e); - } catch (InterruptedException e) { - fetchPromise.setException(e); - } - return fetchPromise; - } - - @Override - public Future<Iterator<String>> getLogs() { - if (duplicatedLogFound.get()) { - return duplicatedLogException(duplicatedLogName.get()); - } - return postStateCheck(retrieveLogs().map( - new AbstractFunction1<List<Set<String>>, Iterator<String>>() { - @Override - public Iterator<String> apply(List<Set<String>> resultList) { - return getIterator(resultList); - } - })); - } - - private Future<List<Set<String>>> retrieveLogs() { - Collection<SubNamespace> subNss = subNamespaces.values(); - List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size()); - for (SubNamespace subNs : subNss) { - logsList.add(subNs.getLogs()); - } - return Future.collect(logsList); - } - - private Iterator<String> getIterator(List<Set<String>> resultList) { - List<Iterator<String>> iterList = Lists.newArrayListWithExpectedSize(resultList.size()); - for (Set<String> result : resultList) { - iterList.add(result.iterator()); - } - return Iterators.concat(iterList.iterator()); - } - - @Override - public void registerNamespaceListener(NamespaceListener listener) { - registerListener(listener); - } - - @Override - protected void watchNamespaceChanges() { - // as the federated namespace already started watching namespace changes, - // we don't need to do any actions here - } - - private void notifyOnNamespaceChanges() { - retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, BoxedUnit>() { - @Override - public BoxedUnit apply(List<Set<String>> resultList) { - for (NamespaceListener listener : listeners) { - listener.onStreamsChanged(getIterator(resultList)); - } - return BoxedUnit.UNIT; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java deleted file mode 100644 index d7ff4fb..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.logsegment; - -import com.twitter.distributedlog.bk.LedgerAllocator; -import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; -import com.twitter.distributedlog.util.Allocator; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.util.Future; -import org.apache.bookkeeper.client.LedgerHandle; -import scala.Function1; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; - -/** - * Allocate log segments - */ -class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> { - - private static class NewLogSegmentEntryWriterFn extends AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> { - - static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE = - new NewLogSegmentEntryWriterFn(); - - private NewLogSegmentEntryWriterFn() {} - - @Override - public LogSegmentEntryWriter apply(LedgerHandle lh) { - return new BKLogSegmentEntryWriter(lh); - } - } - - LedgerAllocator allocator; - - BKLogSegmentAllocator(LedgerAllocator allocator) { - this.allocator = allocator; - } - - @Override - public void allocate() throws IOException { - allocator.allocate(); - } - - @Override - public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn, - final Transaction.OpListener<LogSegmentEntryWriter> listener) { - return allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() { - @Override - public void onCommit(LedgerHandle lh) { - listener.onCommit(new BKLogSegmentEntryWriter(lh)); - } - - @Override - public void onAbort(Throwable t) { - listener.onAbort(t); - } - }).map(NewLogSegmentEntryWriterFn.INSTANCE); - } - - @Override - public Future<Void> asyncClose() { - return allocator.asyncClose(); - } - - @Override - public Future<Void> delete() { - return allocator.delete(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java deleted file mode 100644 index f85760d..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java +++ /dev/null @@ -1,837 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.logsegment; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.Entry; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.exceptions.BKTransmitException; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.EndOfLogSegmentException; -import com.twitter.distributedlog.exceptions.ReadCancelledException; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * BookKeeper ledger based log segment entry reader. - */ -public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, AsyncCallback.OpenCallback { - - private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class); - - private class CacheEntry implements Runnable, AsyncCallback.ReadCallback, - AsyncCallback.ReadLastConfirmedAndEntryCallback { - - protected final long entryId; - private boolean done; - private LedgerEntry entry; - private int rc; - - private CacheEntry(long entryId) { - this.entryId = entryId; - this.entry = null; - this.rc = BKException.Code.UnexpectedConditionException; - this.done = false; - } - - long getEntryId() { - return entryId; - } - - synchronized boolean isDone() { - return done; - } - - void setValue(LedgerEntry entry) { - synchronized (this) { - if (done) { - return; - } - this.rc = BKException.Code.OK; - this.entry = entry; - } - setDone(true); - } - - void setException(int rc) { - synchronized (this) { - if (done) { - return; - } - this.rc = rc; - } - setDone(false); - } - - void setDone(boolean success) { - synchronized (this) { - this.done = true; - } - onReadEntryDone(success); - } - - synchronized boolean isSuccess() { - return BKException.Code.OK == rc; - } - - synchronized LedgerEntry getEntry() { - return this.entry; - } - - synchronized int getRc() { - return rc; - } - - @Override - public void readComplete(int rc, - LedgerHandle lh, - Enumeration<LedgerEntry> entries, - Object ctx) { - if (failureInjector.shouldInjectCorruption(entryId, entryId)) { - rc = BKException.Code.DigestMatchException; - } - processReadEntries(rc, lh, entries, ctx); - } - - void processReadEntries(int rc, - LedgerHandle lh, - Enumeration<LedgerEntry> entries, - Object ctx) { - if (isDone()) { - return; - } - if (!checkReturnCodeAndHandleFailure(rc, false)) { - return; - } - LedgerEntry entry = null; - while (entries.hasMoreElements()) { - // more entries are returned - if (null != entry) { - setException(BKException.Code.UnexpectedConditionException); - return; - } - entry = entries.nextElement(); - } - if (null == entry || entry.getEntryId() != entryId) { - setException(BKException.Code.UnexpectedConditionException); - return; - } - setValue(entry); - } - - @Override - public void readLastConfirmedAndEntryComplete(int rc, - long entryId, - LedgerEntry entry, - Object ctx) { - if (failureInjector.shouldInjectCorruption(this.entryId, this.entryId)) { - rc = BKException.Code.DigestMatchException; - } - processReadEntry(rc, entryId, entry, ctx); - } - - void processReadEntry(int rc, - long entryId, - LedgerEntry entry, - Object ctx) { - if (isDone()) { - return; - } - if (!checkReturnCodeAndHandleFailure(rc, true)) { - return; - } - if (null != entry && this.entryId == entryId) { - setValue(entry); - return; - } - // the long poll is timeout or interrupted; we will retry it again. - issueRead(this); - } - - /** - * Check return code and retry if needed. - * - * @param rc the return code - * @param isLongPoll is it a long poll request - * @return is the request successful or not - */ - boolean checkReturnCodeAndHandleFailure(int rc, boolean isLongPoll) { - if (BKException.Code.OK == rc) { - numReadErrors.set(0); - return true; - } - if (BKException.Code.BookieHandleNotAvailableException == rc || - (isLongPoll && BKException.Code.NoSuchLedgerExistsException == rc)) { - int numErrors = Math.max(1, numReadErrors.incrementAndGet()); - int nextReadBackoffTime = Math.min(numErrors * readAheadWaitTime, maxReadBackoffTime); - scheduler.schedule( - getSegment().getLogSegmentId(), - this, - nextReadBackoffTime, - TimeUnit.MILLISECONDS); - } else { - setException(rc); - } - return false; - } - - @Override - public void run() { - issueRead(this); - } - } - - private class PendingReadRequest { - private final int numEntries; - private final List<Entry.Reader> entries; - private final Promise<List<Entry.Reader>> promise; - - PendingReadRequest(int numEntries) { - this.numEntries = numEntries; - if (numEntries == 1) { - this.entries = new ArrayList<Entry.Reader>(1); - } else { - this.entries = new ArrayList<Entry.Reader>(); - } - this.promise = new Promise<List<Entry.Reader>>(); - } - - Promise<List<Entry.Reader>> getPromise() { - return promise; - } - - void setException(Throwable throwable) { - FutureUtils.setException(promise, throwable); - } - - void addEntry(Entry.Reader entry) { - entries.add(entry); - } - - void complete() { - FutureUtils.setValue(promise, entries); - onEntriesConsumed(entries.size()); - } - - boolean hasReadEntries() { - return entries.size() > 0; - } - - boolean hasReadEnoughEntries() { - return entries.size() >= numEntries; - } - } - - private final BookKeeper bk; - private final DistributedLogConfiguration conf; - private final OrderedScheduler scheduler; - private final long lssn; - private final long startSequenceId; - private final boolean envelopeEntries; - private final boolean deserializeRecordSet; - private final int numPrefetchEntries; - private final int maxPrefetchEntries; - // state - private Promise<Void> closePromise = null; - private LogSegmentMetadata metadata; - private LedgerHandle lh; - private final List<LedgerHandle> openLedgerHandles; - private CacheEntry outstandingLongPoll; - private long nextEntryId; - private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null); - private final AtomicLong scheduleCount = new AtomicLong(0); - private volatile boolean hasCaughtupOnInprogress = false; - private final CopyOnWriteArraySet<StateChangeListener> stateChangeListeners = - new CopyOnWriteArraySet<StateChangeListener>(); - // read retries - private int readAheadWaitTime; - private final int maxReadBackoffTime; - private final AtomicInteger numReadErrors = new AtomicInteger(0); - private final boolean skipBrokenEntries; - // readahead cache - int cachedEntries = 0; - int numOutstandingEntries = 0; - final LinkedBlockingQueue<CacheEntry> readAheadEntries; - // request queue - final LinkedList<PendingReadRequest> readQueue; - - // failure injector - private final AsyncFailureInjector failureInjector; - // Stats - private final Counter skippedBrokenEntriesCounter; - - BKLogSegmentEntryReader(LogSegmentMetadata metadata, - LedgerHandle lh, - long startEntryId, - BookKeeper bk, - OrderedScheduler scheduler, - DistributedLogConfiguration conf, - StatsLogger statsLogger, - AsyncFailureInjector failureInjector) { - this.metadata = metadata; - this.lssn = metadata.getLogSegmentSequenceNumber(); - this.startSequenceId = metadata.getStartSequenceId(); - this.envelopeEntries = metadata.getEnvelopeEntries(); - this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads(); - this.lh = lh; - this.nextEntryId = Math.max(startEntryId, 0); - this.bk = bk; - this.conf = conf; - this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment(); - this.maxPrefetchEntries = conf.getMaxPrefetchEntriesPerLogSegment(); - this.scheduler = scheduler; - this.openLedgerHandles = Lists.newArrayList(); - this.openLedgerHandles.add(lh); - this.outstandingLongPoll = null; - // create the readahead queue - this.readAheadEntries = new LinkedBlockingQueue<CacheEntry>(); - // create the read request queue - this.readQueue = new LinkedList<PendingReadRequest>(); - // read backoff settings - this.readAheadWaitTime = conf.getReadAheadWaitTime(); - this.maxReadBackoffTime = 4 * conf.getReadAheadWaitTime(); - // other read settings - this.skipBrokenEntries = conf.getReadAheadSkipBrokenEntries(); - - // Failure Injection - this.failureInjector = failureInjector; - // Stats - this.skippedBrokenEntriesCounter = statsLogger.getCounter("skipped_broken_entries"); - } - - @VisibleForTesting - public synchronized CacheEntry getOutstandingLongPoll() { - return outstandingLongPoll; - } - - @VisibleForTesting - LinkedBlockingQueue<CacheEntry> getReadAheadEntries() { - return this.readAheadEntries; - } - - synchronized LedgerHandle getLh() { - return lh; - } - - @Override - public synchronized LogSegmentMetadata getSegment() { - return metadata; - } - - @VisibleForTesting - synchronized long getNextEntryId() { - return nextEntryId; - } - - @Override - public void start() { - prefetchIfNecessary(); - } - - @Override - public boolean hasCaughtUpOnInprogress() { - return hasCaughtupOnInprogress; - } - - @Override - public LogSegmentEntryReader registerListener(StateChangeListener listener) { - stateChangeListeners.add(listener); - return this; - } - - @Override - public LogSegmentEntryReader unregisterListener(StateChangeListener listener) { - stateChangeListeners.remove(listener); - return this; - } - - private void notifyCaughtupOnInprogress() { - for (StateChangeListener listener : stateChangeListeners) { - listener.onCaughtupOnInprogress(); - } - } - - // - // Process on Log Segment Metadata Updates - // - - @Override - public synchronized void onLogSegmentMetadataUpdated(LogSegmentMetadata segment) { - if (metadata == segment || - LogSegmentMetadata.COMPARATOR.compare(metadata, segment) == 0 || - !(metadata.isInProgress() && !segment.isInProgress())) { - return; - } - // segment is closed from inprogress, then re-open the log segment - bk.asyncOpenLedger( - segment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, - conf.getBKDigestPW().getBytes(UTF_8), - this, - segment); - } - - @Override - public void openComplete(int rc, LedgerHandle lh, Object ctx) { - LogSegmentMetadata segment = (LogSegmentMetadata) ctx; - if (BKException.Code.OK != rc) { - // fail current reader or retry opening the reader - failOrRetryOpenLedger(rc, segment); - return; - } - // switch to new ledger handle if the log segment is moved to completed. - CacheEntry longPollRead = null; - synchronized (this) { - if (isClosed()) { - lh.asyncClose(new AsyncCallback.CloseCallback() { - @Override - public void closeComplete(int rc, LedgerHandle lh, Object ctx) { - logger.debug("Close the open ledger {} since the log segment reader is already closed", - lh.getId()); - } - }, null); - return; - } - this.metadata = segment; - this.lh = lh; - this.openLedgerHandles.add(lh); - longPollRead = outstandingLongPoll; - } - if (null != longPollRead) { - // reissue the long poll read when the log segment state is changed - issueRead(longPollRead); - } - // notify readers - notifyReaders(); - } - - private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata segment) { - if (isClosed()) { - return; - } - if (isBeyondLastAddConfirmed()) { - // if the reader is already caught up, let's fail the reader immediately - // as we need to pull the latest metadata of this log segment. - setException(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc), - true); - return; - } - // the reader is still catching up, retry opening the log segment later - scheduler.schedule(segment.getLogSegmentId(), new Runnable() { - @Override - public void run() { - onLogSegmentMetadataUpdated(segment); - } - }, conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS); - } - - // - // Change the state of this reader - // - - private boolean checkClosedOrInError() { - if (null != lastException.get()) { - cancelAllPendingReads(lastException.get()); - return true; - } - return false; - } - - /** - * Set the reader into error state with return code <i>rc</i>. - * - * @param throwable exception indicating the error - * @param isBackground is the reader set exception by background reads or foreground reads - */ - private void setException(Throwable throwable, boolean isBackground) { - lastException.compareAndSet(null, throwable); - if (isBackground) { - notifyReaders(); - } - } - - /** - * Notify the readers with the state change. - */ - private void notifyReaders() { - processReadRequests(); - } - - private void cancelAllPendingReads(Throwable throwExc) { - List<PendingReadRequest> requestsToCancel; - synchronized (readQueue) { - requestsToCancel = Lists.newArrayListWithExpectedSize(readQueue.size()); - requestsToCancel.addAll(readQueue); - readQueue.clear(); - } - for (PendingReadRequest request : requestsToCancel) { - request.setException(throwExc); - } - } - - // - // Background Read Operations - // - - private void onReadEntryDone(boolean success) { - // we successfully read an entry - synchronized (this) { - --numOutstandingEntries; - } - // notify reader that there is entry ready - notifyReaders(); - // stop prefetch if we already encountered exceptions - if (success) { - prefetchIfNecessary(); - } - } - - private void onEntriesConsumed(int numEntries) { - synchronized (this) { - cachedEntries -= numEntries; - } - prefetchIfNecessary(); - } - - private void prefetchIfNecessary() { - List<CacheEntry> entriesToFetch; - synchronized (this) { - if (cachedEntries >= maxPrefetchEntries) { - return; - } - // we don't have enough entries, do prefetch - int numEntriesToFetch = numPrefetchEntries - numOutstandingEntries; - if (numEntriesToFetch <= 0) { - return; - } - entriesToFetch = new ArrayList<CacheEntry>(numEntriesToFetch); - for (int i = 0; i < numEntriesToFetch; i++) { - if (cachedEntries >= maxPrefetchEntries) { - break; - } - if ((isLedgerClosed() && nextEntryId > getLastAddConfirmed()) || - (!isLedgerClosed() && nextEntryId > getLastAddConfirmed() + 1)) { - break; - } - CacheEntry entry = new CacheEntry(nextEntryId); - entriesToFetch.add(entry); - readAheadEntries.add(entry); - ++numOutstandingEntries; - ++cachedEntries; - ++nextEntryId; - } - } - for (CacheEntry entry : entriesToFetch) { - issueRead(entry); - } - } - - - private void issueRead(CacheEntry cacheEntry) { - if (isClosed()) { - return; - } - if (isLedgerClosed()) { - if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) { - issueSimpleRead(cacheEntry); - return; - } else { - // Reach the end of stream - notifyReaders(); - } - } else { // the ledger is still in progress - if (isNotBeyondLastAddConfirmed(cacheEntry.getEntryId())) { - issueSimpleRead(cacheEntry); - } else { - issueLongPollRead(cacheEntry); - } - } - } - - private void issueSimpleRead(CacheEntry cacheEntry) { - getLh().asyncReadEntries(cacheEntry.entryId, cacheEntry.entryId, cacheEntry, null); - } - - private void issueLongPollRead(CacheEntry cacheEntry) { - // register the read as outstanding reads - synchronized (this) { - this.outstandingLongPoll = cacheEntry; - } - - if (!hasCaughtupOnInprogress) { - hasCaughtupOnInprogress = true; - notifyCaughtupOnInprogress(); - } - getLh().asyncReadLastConfirmedAndEntry( - cacheEntry.entryId, - conf.getReadLACLongPollTimeout(), - false, - cacheEntry, - null); - } - - // - // Foreground Read Operations - // - - Entry.Reader processReadEntry(LedgerEntry entry) throws IOException { - return Entry.newBuilder() - .setLogSegmentInfo(lssn, startSequenceId) - .setEntryId(entry.getEntryId()) - .setEnvelopeEntry(envelopeEntries) - .deserializeRecordSet(deserializeRecordSet) - .setInputStream(entry.getEntryInputStream()) - .buildReader(); - } - - @Override - public Future<List<Entry.Reader>> readNext(int numEntries) { - final PendingReadRequest readRequest = new PendingReadRequest(numEntries); - - if (checkClosedOrInError()) { - readRequest.setException(lastException.get()); - } else { - boolean wasQueueEmpty; - synchronized (readQueue) { - wasQueueEmpty = readQueue.isEmpty(); - readQueue.add(readRequest); - } - if (wasQueueEmpty) { - processReadRequests(); - } - } - return readRequest.getPromise(); - } - - private void processReadRequests() { - if (isClosed()) { - // the reader is already closed. - return; - } - - long prevCount = scheduleCount.getAndIncrement(); - if (0 == prevCount) { - scheduler.submit(getSegment().getLogSegmentId(), this); - } - } - - /** - * The core function to propagate fetched entries to read requests - */ - @Override - public void run() { - long scheduleCountLocal = scheduleCount.get(); - while (true) { - PendingReadRequest nextRequest = null; - synchronized (readQueue) { - nextRequest = readQueue.peek(); - } - - // if read queue is empty, nothing to read, return - if (null == nextRequest) { - scheduleCount.set(0L); - return; - } - - // if the oldest pending promise is interrupted then we must - // mark the reader in error and abort all pending reads since - // we don't know the last consumed read - if (null == lastException.get()) { - if (nextRequest.getPromise().isInterrupted().isDefined()) { - setException(new DLInterruptedException("Interrupted on reading log segment " - + getSegment() + " : " + nextRequest.getPromise().isInterrupted().get()), false); - } - } - - // if the reader is in error state, stop read - if (checkClosedOrInError()) { - return; - } - - // read entries from readahead cache to satisfy next read request - readEntriesFromReadAheadCache(nextRequest); - - // check if we can satisfy the read request - if (nextRequest.hasReadEntries()) { - PendingReadRequest request; - synchronized (readQueue) { - request = readQueue.poll(); - } - if (null != request && nextRequest == request) { - request.complete(); - } else { - DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from " - + getSegment()); - nextRequest.setException(ise); - if (null != request) { - request.setException(ise); - } - setException(ise, false); - } - } else { - if (0 == scheduleCountLocal) { - return; - } - scheduleCountLocal = scheduleCount.decrementAndGet(); - } - } - } - - private void readEntriesFromReadAheadCache(PendingReadRequest nextRequest) { - while (!nextRequest.hasReadEnoughEntries()) { - CacheEntry entry; - boolean hitEndOfLogSegment; - synchronized (this) { - entry = readAheadEntries.peek(); - hitEndOfLogSegment = (null == entry) && isEndOfLogSegment(); - } - // reach end of log segment - if (hitEndOfLogSegment) { - setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); - return; - } - if (null == entry) { - return; - } - // entry is not complete yet. - if (!entry.isDone()) { - // we already reached end of the log segment - if (isEndOfLogSegment(entry.getEntryId())) { - setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false); - } - return; - } - if (entry.isSuccess()) { - CacheEntry removedEntry = readAheadEntries.poll(); - if (entry != removedEntry) { - DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from " - + getSegment()); - setException(ise, false); - return; - } - try { - nextRequest.addEntry(processReadEntry(entry.getEntry())); - } catch (IOException e) { - setException(e, false); - return; - } - } else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) { - // skip this entry and move forward - skippedBrokenEntriesCounter.inc(); - readAheadEntries.poll(); - continue; - } else { - setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId() - + " @ log segment " + getSegment(), entry.getRc()), false); - return; - } - } - } - - // - // State Management - // - - private synchronized boolean isEndOfLogSegment() { - return isEndOfLogSegment(nextEntryId); - } - - private boolean isEndOfLogSegment(long entryId) { - return isLedgerClosed() && entryId > getLastAddConfirmed(); - } - - @Override - public synchronized boolean isBeyondLastAddConfirmed() { - return isBeyondLastAddConfirmed(nextEntryId); - } - - private boolean isBeyondLastAddConfirmed(long entryId) { - return entryId > getLastAddConfirmed(); - } - - private boolean isNotBeyondLastAddConfirmed(long entryId) { - return entryId <= getLastAddConfirmed(); - } - - private boolean isLedgerClosed() { - return getLh().isClosed(); - } - - @Override - public long getLastAddConfirmed() { - return getLh().getLastAddConfirmed(); - } - - synchronized boolean isClosed() { - return null != closePromise; - } - - @Override - public Future<Void> asyncClose() { - final Promise<Void> closeFuture; - ReadCancelledException exception; - LedgerHandle[] lhsToClose; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - lhsToClose = openLedgerHandles.toArray(new LedgerHandle[openLedgerHandles.size()]); - // set the exception to cancel pending and subsequent reads - exception = new ReadCancelledException(getSegment().getZNodeName(), "Reader was closed"); - setException(exception, false); - } - - // cancel all pending reads - cancelAllPendingReads(exception); - - // close all the open ledger - BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture); - return closeFuture; - } -}