http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java new file mode 100644 index 0000000..eeda804 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKMetadataAccessor.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl; + +import java.io.IOException; +import java.net.URI; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.MetadataAccessor; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.ZooKeeperClientBuilder; +import com.twitter.distributedlog.exceptions.AlreadyClosedException; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.impl.metadata.BKDLConfig; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.bookkeeper.zookeeper.RetryPolicy; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.twitter.distributedlog.impl.BKNamespaceDriver.getZKServersFromDLUri; + +public class ZKMetadataAccessor implements MetadataAccessor { + static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class); + protected final String name; + protected Promise<Void> closePromise; + protected final URI uri; + // zookeeper clients + // NOTE: The actual zookeeper client is initialized lazily when it is referenced by + // {@link com.twitter.distributedlog.ZooKeeperClient#get()}. So it is safe to + // keep builders and their client wrappers here, as they will be used when + // instantiating readers or writers. + protected final ZooKeeperClientBuilder writerZKCBuilder; + protected final ZooKeeperClient writerZKC; + protected final boolean ownWriterZKC; + protected final ZooKeeperClientBuilder readerZKCBuilder; + protected final ZooKeeperClient readerZKC; + protected final boolean ownReaderZKC; + + ZKMetadataAccessor(String name, + DistributedLogConfiguration conf, + URI uri, + ZooKeeperClientBuilder writerZKCBuilder, + ZooKeeperClientBuilder readerZKCBuilder, + StatsLogger statsLogger) { + this.name = name; + this.uri = uri; + + if (null == writerZKCBuilder) { + RetryPolicy retryPolicy = null; + if (conf.getZKNumRetries() > 0) { + retryPolicy = new BoundExponentialBackoffRetryPolicy( + conf.getZKRetryBackoffStartMillis(), + conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); + } + this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder() + .name(String.format("dlzk:%s:dlm_writer_shared", name)) + .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) + .retryThreadCount(conf.getZKClientNumberRetryThreads()) + .requestRateLimit(conf.getZKRequestRateLimit()) + .zkAclId(conf.getZkAclId()) + .uri(uri) + .retryPolicy(retryPolicy) + .statsLogger(statsLogger.scope("dlzk_dlm_writer_shared")); + this.ownWriterZKC = true; + } else { + this.writerZKCBuilder = writerZKCBuilder; + this.ownWriterZKC = false; + } + this.writerZKC = this.writerZKCBuilder.build(); + + if (null == readerZKCBuilder) { + String zkServersForWriter = getZKServersFromDLUri(uri); + String zkServersForReader; + try { + BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri); + zkServersForReader = bkdlConfig.getDlZkServersForReader(); + } catch (IOException e) { + LOG.warn("Error on resolving dl metadata bindings for {} : ", uri, e); + zkServersForReader = zkServersForWriter; + } + if (zkServersForReader.equals(zkServersForWriter)) { + LOG.info("Used same zookeeper servers '{}' for both writers and readers for {}.", + zkServersForWriter, name); + this.readerZKCBuilder = this.writerZKCBuilder; + this.ownReaderZKC = false; + } else { + RetryPolicy retryPolicy = null; + if (conf.getZKNumRetries() > 0) { + retryPolicy = new BoundExponentialBackoffRetryPolicy( + conf.getZKRetryBackoffStartMillis(), + conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries()); + } + this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder() + .name(String.format("dlzk:%s:dlm_reader_shared", name)) + .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) + .retryThreadCount(conf.getZKClientNumberRetryThreads()) + .requestRateLimit(conf.getZKRequestRateLimit()) + .zkServers(zkServersForReader) + .retryPolicy(retryPolicy) + .zkAclId(conf.getZkAclId()) + .statsLogger(statsLogger.scope("dlzk_dlm_reader_shared")); + this.ownReaderZKC = true; + } + } else { + this.readerZKCBuilder = readerZKCBuilder; + this.ownReaderZKC = false; + } + this.readerZKC = this.readerZKCBuilder.build(); + } + + /** + * Get the name of the stream managed by this log manager + * + * @return streamName + */ + @Override + public String getStreamName() { + return name; + } + + /** + * Creates or update the metadata stored at the node associated with the + * name and URI + * @param metadata opaque metadata to be stored for the node + * @throws IOException + */ + @Override + public void createOrUpdateMetadata(byte[] metadata) throws IOException { + checkClosedOrInError("createOrUpdateMetadata"); + + String zkPath = getZKPath(); + LOG.debug("Setting application specific metadata on {}", zkPath); + try { + Stat currentStat = writerZKC.get().exists(zkPath, false); + if (currentStat == null) { + if (metadata.length > 0) { + Utils.zkCreateFullPathOptimistic(writerZKC, + zkPath, + metadata, + writerZKC.getDefaultACL(), + CreateMode.PERSISTENT); + } + } else { + writerZKC.get().setData(zkPath, metadata, currentStat.getVersion()); + } + } catch (InterruptedException ie) { + throw new DLInterruptedException("Interrupted on creating or updating container metadata", ie); + } catch (Exception exc) { + throw new IOException("Exception creating or updating container metadata", exc); + } + } + + /** + * Delete the metadata stored at the associated node. This only deletes the metadata + * and not the node itself + * @throws IOException + */ + @Override + public void deleteMetadata() throws IOException { + checkClosedOrInError("createOrUpdateMetadata"); + createOrUpdateMetadata(null); + } + + /** + * Retrieve the metadata stored at the node + * @return byte array containing the metadata + * @throws IOException + */ + @Override + public byte[] getMetadata() throws IOException { + checkClosedOrInError("createOrUpdateMetadata"); + String zkPath = getZKPath(); + LOG.debug("Getting application specific metadata from {}", zkPath); + try { + Stat currentStat = readerZKC.get().exists(zkPath, false); + if (currentStat == null) { + return null; + } else { + return readerZKC.get().getData(zkPath, false, currentStat); + } + } catch (InterruptedException ie) { + throw new DLInterruptedException("Error reading the max tx id from zk", ie); + } catch (Exception e) { + throw new IOException("Error reading the max tx id from zk", e); + } + } + + /** + * Close the metadata accessor, freeing any resources it may hold. + * @return future represents the close result. + */ + @Override + public Future<Void> asyncClose() { + Promise<Void> closeFuture; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closeFuture = closePromise = new Promise<Void>(); + } + // NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests + // the managers created by the namespace - whose zkc will be closed by namespace + try { + if (ownWriterZKC) { + writerZKC.close(); + } + if (ownReaderZKC) { + readerZKC.close(); + } + } catch (Exception e) { + LOG.warn("Exception while closing distributed log manager", e); + } + FutureUtils.setValue(closeFuture, null); + return closeFuture; + } + + @Override + public void close() throws IOException { + FutureUtils.result(asyncClose()); + } + + public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { + if (null != closePromise) { + throw new AlreadyClosedException("Executing " + operation + " on already closed ZKMetadataAccessor"); + } + } + + protected String getZKPath() { + return String.format("%s/%s", uri.getPath(), name); + } + + @VisibleForTesting + protected ZooKeeperClient getReaderZKC() { + return readerZKC; + } + + @VisibleForTesting + protected ZooKeeperClient getWriterZKC() { + return writerZKC; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java index 61c1760..06bc8fb 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKNamespaceWatcher.java @@ -37,7 +37,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.twitter.distributedlog.impl.BKDLUtils.*; +import static com.twitter.distributedlog.util.DLUtils.*; /** * Watcher on watching a given namespace http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java new file mode 100644 index 0000000..8126723 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControl.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.acl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.thrift.AccessControlEntry; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import static com.google.common.base.Charsets.UTF_8; + +public class ZKAccessControl { + + private static final int BUFFER_SIZE = 4096; + + public static final AccessControlEntry DEFAULT_ACCESS_CONTROL_ENTRY = new AccessControlEntry(); + + public static class CorruptedAccessControlException extends IOException { + + private static final long serialVersionUID = 5391285182476211603L; + + public CorruptedAccessControlException(String zkPath, Throwable t) { + super("Access Control @ " + zkPath + " is corrupted.", t); + } + } + + protected final AccessControlEntry accessControlEntry; + protected final String zkPath; + private int zkVersion; + + public ZKAccessControl(AccessControlEntry ace, String zkPath) { + this(ace, zkPath, -1); + } + + private ZKAccessControl(AccessControlEntry ace, String zkPath, int zkVersion) { + this.accessControlEntry = ace; + this.zkPath = zkPath; + this.zkVersion = zkVersion; + } + + @Override + public int hashCode() { + return Objects.hashCode(zkPath, accessControlEntry); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ZKAccessControl)) { + return false; + } + ZKAccessControl other = (ZKAccessControl) obj; + return Objects.equal(zkPath, other.zkPath) && + Objects.equal(accessControlEntry, other.accessControlEntry); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("entry(path=").append(zkPath).append(", acl=") + .append(accessControlEntry).append(")"); + return sb.toString(); + } + + @VisibleForTesting + public String getZKPath() { + return zkPath; + } + + @VisibleForTesting + public AccessControlEntry getAccessControlEntry() { + return accessControlEntry; + } + + public Future<ZKAccessControl> create(ZooKeeperClient zkc) { + final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + try { + zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT, + new AsyncCallback.StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (KeeperException.Code.OK.intValue() == rc) { + ZKAccessControl.this.zkVersion = 0; + promise.setValue(ZKAccessControl.this); + } else { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + } catch (InterruptedException e) { + promise.setException(e); + } catch (IOException e) { + promise.setException(e); + } + return promise; + } + + public Future<ZKAccessControl> update(ZooKeeperClient zkc) { + final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + try { + zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() { + @Override + public void processResult(int rc, String path, Object ctx, Stat stat) { + if (KeeperException.Code.OK.intValue() == rc) { + ZKAccessControl.this.zkVersion = stat.getVersion(); + promise.setValue(ZKAccessControl.this); + } else { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + } catch (InterruptedException e) { + promise.setException(e); + } catch (IOException e) { + promise.setException(e); + } + return promise; + } + + public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) { + final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + + try { + zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + if (KeeperException.Code.OK.intValue() == rc) { + try { + AccessControlEntry ace = deserialize(zkPath, data); + promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion())); + } catch (IOException ioe) { + promise.setException(ioe); + } + } else { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + } catch (InterruptedException e) { + promise.setException(e); + } + return promise; + } + + public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) { + final Promise<Void> promise = new Promise<Void>(); + + try { + zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String path, Object ctx) { + if (KeeperException.Code.OK.intValue() == rc || + KeeperException.Code.NONODE.intValue() == rc) { + promise.setValue(null); + } else { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + } catch (InterruptedException e) { + promise.setException(e); + } + return promise; + } + + static byte[] serialize(AccessControlEntry ace) throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + ace.write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize access control entry : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize acesss control entry : ", uee); + } + } + + static AccessControlEntry deserialize(String zkPath, byte[] data) throws IOException { + if (data.length == 0) { + return DEFAULT_ACCESS_CONTROL_ENTRY; + } + + AccessControlEntry ace = new AccessControlEntry(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + ace.read(protocol); + } catch (TException e) { + throw new CorruptedAccessControlException(zkPath, e); + } + return ace; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java new file mode 100644 index 0000000..0c90a50 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/acl/ZKAccessControlManager.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.acl; + +import com.google.common.collect.Sets; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.thrift.AccessControlEntry; +import com.twitter.util.Await; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager} + */ +public class ZKAccessControlManager implements AccessControlManager, Watcher { + + private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class); + + private static final int ZK_RETRY_BACKOFF_MS = 500; + + protected final DistributedLogConfiguration conf; + protected final ZooKeeperClient zkc; + protected final String zkRootPath; + protected final ScheduledExecutorService scheduledExecutorService; + + protected final ConcurrentMap<String, ZKAccessControl> streamEntries; + protected ZKAccessControl defaultAccessControl; + protected volatile boolean closed = false; + + public ZKAccessControlManager(DistributedLogConfiguration conf, + ZooKeeperClient zkc, + String zkRootPath, + ScheduledExecutorService scheduledExecutorService) throws IOException { + this.conf = conf; + this.zkc = zkc; + this.zkRootPath = zkRootPath; + this.scheduledExecutorService = scheduledExecutorService; + this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>(); + try { + Await.result(fetchDefaultAccessControlEntry()); + } catch (Throwable t) { + if (t instanceof InterruptedException) { + throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t); + } else if (t instanceof KeeperException) { + throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t); + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t); + } + } + + try { + Await.result(fetchAccessControlEntries()); + } catch (Throwable t) { + if (t instanceof InterruptedException) { + throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t); + } else if (t instanceof KeeperException) { + throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t); + } else if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t); + } + } + } + + protected AccessControlEntry getAccessControlEntry(String stream) { + ZKAccessControl entry = streamEntries.get(stream); + entry = null == entry ? defaultAccessControl : entry; + return entry.getAccessControlEntry(); + } + + @Override + public boolean allowWrite(String stream) { + return !getAccessControlEntry(stream).isDenyWrite(); + } + + @Override + public boolean allowTruncate(String stream) { + return !getAccessControlEntry(stream).isDenyTruncate(); + } + + @Override + public boolean allowDelete(String stream) { + return !getAccessControlEntry(stream).isDenyDelete(); + } + + @Override + public boolean allowAcquire(String stream) { + return !getAccessControlEntry(stream).isDenyAcquire(); + } + + @Override + public boolean allowRelease(String stream) { + return !getAccessControlEntry(stream).isDenyRelease(); + } + + @Override + public void close() { + closed = true; + } + + private Future<Void> fetchAccessControlEntries() { + final Promise<Void> promise = new Promise<Void>(); + fetchAccessControlEntries(promise); + return promise; + } + + private void fetchAccessControlEntries(final Promise<Void> promise) { + try { + zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() { + @Override + public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (KeeperException.Code.OK.intValue() != rc) { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + return; + } + Set<String> streamsReceived = new HashSet<String>(); + streamsReceived.addAll(children); + Set<String> streamsCached = streamEntries.keySet(); + Set<String> streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy(); + for (String s : streamsRemoved) { + ZKAccessControl accessControl = streamEntries.remove(s); + if (null != accessControl) { + logger.info("Removed Access Control Entry for stream {} : {}", s, accessControl.getAccessControlEntry()); + } + } + if (streamsReceived.isEmpty()) { + promise.setValue(null); + return; + } + final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size()); + final AtomicInteger numFailures = new AtomicInteger(0); + for (String s : streamsReceived) { + final String streamName = s; + ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null) + .addEventListener(new FutureEventListener<ZKAccessControl>() { + + @Override + public void onSuccess(ZKAccessControl accessControl) { + streamEntries.put(streamName, accessControl); + logger.info("Added overrided access control for stream {} : {}", streamName, accessControl.getAccessControlEntry()); + complete(); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof KeeperException.NoNodeException) { + streamEntries.remove(streamName); + } else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) { + logger.warn("Access control is corrupted for stream {} @ {}, skipped it ...", + new Object[] { streamName, zkRootPath, cause }); + streamEntries.remove(streamName); + } else { + if (1 == numFailures.incrementAndGet()) { + promise.setException(cause); + } + } + complete(); + } + + private void complete() { + if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) { + promise.setValue(null); + } + } + }); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + } catch (InterruptedException e) { + promise.setException(e); + } + } + + private Future<ZKAccessControl> fetchDefaultAccessControlEntry() { + final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>(); + fetchDefaultAccessControlEntry(promise); + return promise; + } + + private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) { + ZKAccessControl.read(zkc, zkRootPath, this) + .addEventListener(new FutureEventListener<ZKAccessControl>() { + @Override + public void onSuccess(ZKAccessControl accessControl) { + logger.info("Default Access Control will be changed from {} to {}", + ZKAccessControlManager.this.defaultAccessControl, + accessControl); + ZKAccessControlManager.this.defaultAccessControl = accessControl; + promise.setValue(accessControl); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof KeeperException.NoNodeException) { + logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath); + createDefaultAccessControlEntryIfNeeded(promise); + } else { + promise.setException(cause); + } + } + }); + } + + private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) { + ZooKeeper zk; + try { + zk = zkc.get(); + } catch (ZooKeeperClient.ZooKeeperConnectionException e) { + promise.setException(e); + return; + } catch (InterruptedException e) { + promise.setException(e); + return; + } + ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(), + CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (KeeperException.Code.OK.intValue() == rc) { + logger.info("Created zk path {} for default ACL.", zkRootPath); + fetchDefaultAccessControlEntry(promise); + } else { + promise.setException(KeeperException.create(KeeperException.Code.get(rc))); + } + } + }, null); + } + + private void refetchDefaultAccessControlEntry(final int delayMs) { + if (closed) { + return; + } + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { + @Override + public void onSuccess(ZKAccessControl value) { + // no-op + } + @Override + public void onFailure(Throwable cause) { + if (cause instanceof ZKAccessControl.CorruptedAccessControlException) { + logger.warn("Default access control entry is corrupted, ignore this update : ", cause); + return; + } + + logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ", + ZK_RETRY_BACKOFF_MS, cause); + refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS); + } + }); + } + }, delayMs, TimeUnit.MILLISECONDS); + } + + private void refetchAccessControlEntries(final int delayMs) { + if (closed) { + return; + } + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + // no-op + } + @Override + public void onFailure(Throwable cause) { + logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ", + ZK_RETRY_BACKOFF_MS, cause); + refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS); + } + }); + } + }, delayMs, TimeUnit.MILLISECONDS); + } + + private void refetchAllAccessControlEntries(final int delayMs) { + if (closed) { + return; + } + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() { + @Override + public void onSuccess(ZKAccessControl value) { + fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + // no-op + } + + @Override + public void onFailure(Throwable cause) { + logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ", + ZK_RETRY_BACKOFF_MS, cause); + refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS); + } + }); + } + + @Override + public void onFailure(Throwable cause) { + logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ", + ZK_RETRY_BACKOFF_MS, cause); + refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS); + } + }); + } + }, delayMs, TimeUnit.MILLISECONDS); + } + + @Override + public void process(WatchedEvent event) { + if (Event.EventType.None.equals(event.getType())) { + if (event.getState() == Event.KeeperState.Expired) { + refetchAllAccessControlEntries(0); + } + } else if (Event.EventType.NodeDataChanged.equals(event.getType())) { + logger.info("Default ACL for {} is changed, refetching ...", zkRootPath); + refetchDefaultAccessControlEntry(0); + } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) { + logger.info("List of ACLs for {} are changed, refetching ...", zkRootPath); + refetchAccessControlEntries(0); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java new file mode 100644 index 0000000..d7ff4fb --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentAllocator.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.logsegment; + +import com.twitter.distributedlog.bk.LedgerAllocator; +import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; +import com.twitter.distributedlog.util.Allocator; +import com.twitter.distributedlog.util.Transaction; +import com.twitter.util.Future; +import org.apache.bookkeeper.client.LedgerHandle; +import scala.Function1; +import scala.runtime.AbstractFunction1; + +import java.io.IOException; + +/** + * Allocate log segments + */ +class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> { + + private static class NewLogSegmentEntryWriterFn extends AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> { + + static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE = + new NewLogSegmentEntryWriterFn(); + + private NewLogSegmentEntryWriterFn() {} + + @Override + public LogSegmentEntryWriter apply(LedgerHandle lh) { + return new BKLogSegmentEntryWriter(lh); + } + } + + LedgerAllocator allocator; + + BKLogSegmentAllocator(LedgerAllocator allocator) { + this.allocator = allocator; + } + + @Override + public void allocate() throws IOException { + allocator.allocate(); + } + + @Override + public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn, + final Transaction.OpListener<LogSegmentEntryWriter> listener) { + return allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() { + @Override + public void onCommit(LedgerHandle lh) { + listener.onCommit(new BKLogSegmentEntryWriter(lh)); + } + + @Override + public void onAbort(Throwable t) { + listener.onAbort(t); + } + }).map(NewLogSegmentEntryWriterFn.INSTANCE); + } + + @Override + public Future<Void> asyncClose() { + return allocator.asyncClose(); + } + + @Override + public Future<Void> delete() { + return allocator.delete(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java index dc382d2..f85760d 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -209,6 +210,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, int numErrors = Math.max(1, numReadErrors.incrementAndGet()); int nextReadBackoffTime = Math.min(numErrors * readAheadWaitTime, maxReadBackoffTime); scheduler.schedule( + getSegment().getLogSegmentId(), this, nextReadBackoffTime, TimeUnit.MILLISECONDS); @@ -284,6 +286,8 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(null); private final AtomicLong scheduleCount = new AtomicLong(0); private volatile boolean hasCaughtupOnInprogress = false; + private final CopyOnWriteArraySet<StateChangeListener> stateChangeListeners = + new CopyOnWriteArraySet<StateChangeListener>(); // read retries private int readAheadWaitTime; private final int maxReadBackoffTime; @@ -374,6 +378,24 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, return hasCaughtupOnInprogress; } + @Override + public LogSegmentEntryReader registerListener(StateChangeListener listener) { + stateChangeListeners.add(listener); + return this; + } + + @Override + public LogSegmentEntryReader unregisterListener(StateChangeListener listener) { + stateChangeListeners.remove(listener); + return this; + } + + private void notifyCaughtupOnInprogress() { + for (StateChangeListener listener : stateChangeListeners) { + listener.onCaughtupOnInprogress(); + } + } + // // Process on Log Segment Metadata Updates // @@ -440,7 +462,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, return; } // the reader is still catching up, retry opening the log segment later - scheduler.schedule(new Runnable() { + scheduler.schedule(segment.getLogSegmentId(), new Runnable() { @Override public void run() { onLogSegmentMetadataUpdated(segment); @@ -583,6 +605,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, if (!hasCaughtupOnInprogress) { hasCaughtupOnInprogress = true; + notifyCaughtupOnInprogress(); } getLh().asyncReadLastConfirmedAndEntry( cacheEntry.entryId, @@ -633,7 +656,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader, long prevCount = scheduleCount.getAndIncrement(); if (0 == prevCount) { - scheduler.submit(this); + scheduler.submit(getSegment().getLogSegmentId(), this); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java index f7f4acf..91e6dec 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java @@ -20,12 +20,21 @@ package com.twitter.distributedlog.impl.logsegment; import com.twitter.distributedlog.BookKeeperClient; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.LogSegmentMetadata; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider; +import com.twitter.distributedlog.bk.LedgerAllocator; +import com.twitter.distributedlog.bk.LedgerAllocatorDelegator; +import com.twitter.distributedlog.bk.QuorumConfigProvider; +import com.twitter.distributedlog.bk.SimpleLedgerAllocator; +import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.BKTransmitException; import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; +import com.twitter.distributedlog.metadata.LogMetadataForWriter; +import com.twitter.distributedlog.util.Allocator; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.util.Future; @@ -80,21 +89,31 @@ public class BKLogSegmentEntryStore implements } private final byte[] passwd; + private final ZooKeeperClient zkc; private final BookKeeperClient bkc; private final OrderedScheduler scheduler; private final DistributedLogConfiguration conf; + private final DynamicDistributedLogConfiguration dynConf; private final StatsLogger statsLogger; private final AsyncFailureInjector failureInjector; + // ledger allocator + private final LedgerAllocator allocator; public BKLogSegmentEntryStore(DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + ZooKeeperClient zkc, BookKeeperClient bkc, OrderedScheduler scheduler, + LedgerAllocator allocator, StatsLogger statsLogger, AsyncFailureInjector failureInjector) { this.conf = conf; + this.dynConf = dynConf; + this.zkc = zkc; this.bkc = bkc; this.passwd = conf.getBKDigestPW().getBytes(UTF_8); this.scheduler = scheduler; + this.allocator = allocator; this.statsLogger = statsLogger; this.failureInjector = failureInjector; } @@ -129,11 +148,43 @@ public class BKLogSegmentEntryStore implements FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment); } + // + // Writers + // + + LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata, + DynamicDistributedLogConfiguration dynConf) + throws IOException { + LedgerAllocator ledgerAllocatorDelegator; + if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) { + QuorumConfigProvider quorumConfigProvider = + new DynamicQuorumConfigProvider(dynConf); + LedgerAllocator allocator = new SimpleLedgerAllocator( + logMetadata.getAllocationPath(), + logMetadata.getAllocationData(), + quorumConfigProvider, + zkc, + bkc); + ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, true); + } else { + ledgerAllocatorDelegator = allocator; + } + return ledgerAllocatorDelegator; + } + @Override - public Future<LogSegmentEntryWriter> openWriter(LogSegmentMetadata segment) { - throw new UnsupportedOperationException("Not supported yet"); + public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator( + LogMetadataForWriter logMetadata, + DynamicDistributedLogConfiguration dynConf) throws IOException { + // Build the ledger allocator + LedgerAllocator allocator = createLedgerAllocator(logMetadata, dynConf); + return new BKLogSegmentAllocator(allocator); } + // + // Readers + // + @Override public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, long startEntryId) { @@ -220,15 +271,15 @@ public class BKLogSegmentEntryStore implements segment.getLogSegmentId(), BookKeeper.DigestType.CRC32, passwd, - this, - openCallback); + openCallback, + null); } else { bk.asyncOpenLedger( segment.getLogSegmentId(), BookKeeper.DigestType.CRC32, passwd, - this, - openCallback); + openCallback, + null); } return openPromise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java new file mode 100644 index 0000000..3e859fb --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.DistributedLogConstants; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.impl.BKNamespaceDriver; +import com.twitter.distributedlog.metadata.DLConfig; +import com.twitter.distributedlog.thrift.BKDLConfigFormat; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * Configurations for BookKeeper based DL. + */ +public class BKDLConfig implements DLConfig { + + private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class); + + private static final int BUFFER_SIZE = 4096; + private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs = + new ConcurrentHashMap<URI, DLConfig>(); + + public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) { + dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID()); + dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo()); + if (bkdlConfig.isFederatedNamespace()) { + dlConf.setCreateStreamIfNotExists(false); + LOG.info("Disabled createIfNotExists for federated namespace."); + } + LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," + + " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.", + new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(), + dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(), + bkdlConfig.isFederatedNamespace() }); + } + + public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException { + DLConfig dlConfig = cachedDLConfigs.get(uri); + if (dlConfig == null) { + dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig(); + DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig); + if (null != oldDLConfig) { + dlConfig = oldDLConfig; + } + } + assert (dlConfig instanceof BKDLConfig); + return (BKDLConfig)dlConfig; + } + + @VisibleForTesting + public static void clearCachedDLConfigs() { + cachedDLConfigs.clear(); + } + + private String bkZkServersForWriter; + private String bkZkServersForReader; + private String bkLedgersPath; + private boolean sanityCheckTxnID = true; + private boolean encodeRegionID = false; + private String dlZkServersForWriter; + private String dlZkServersForReader; + private String aclRootPath; + private Long firstLogSegmentSeqNo; + private boolean isFederatedNamespace = false; + + /** + * Construct a empty config with given <i>uri</i>. + */ + public BKDLConfig(URI uri) { + this(BKNamespaceDriver.getZKServersFromDLUri(uri), + BKNamespaceDriver.getZKServersFromDLUri(uri), + null, null, null); + } + + /** + * The caller should make sure both dl and bk use same zookeeper server. + * + * @param zkServers + * zk servers used for both dl and bk. + * @param ledgersPath + * ledgers path. + */ + @VisibleForTesting + public BKDLConfig(String zkServers, String ledgersPath) { + this(zkServers, zkServers, zkServers, zkServers, ledgersPath); + } + + public BKDLConfig(String dlZkServersForWriter, + String dlZkServersForReader, + String bkZkServersForWriter, + String bkZkServersForReader, + String bkLedgersPath) { + this.dlZkServersForWriter = dlZkServersForWriter; + this.dlZkServersForReader = dlZkServersForReader; + this.bkZkServersForWriter = bkZkServersForWriter; + this.bkZkServersForReader = bkZkServersForReader; + this.bkLedgersPath = bkLedgersPath; + } + + /** + * @return zk servers used for bk for writers + */ + public String getBkZkServersForWriter() { + return bkZkServersForWriter; + } + + /** + * @return zk servers used for bk for readers + */ + public String getBkZkServersForReader() { + return bkZkServersForReader; + } + + /** + * @return zk servers used for dl for writers + */ + public String getDlZkServersForWriter() { + return dlZkServersForWriter; + } + + /** + * @return zk servers used for dl for readers + */ + public String getDlZkServersForReader() { + return dlZkServersForReader; + } + + /** + * @return ledgers path for bk + */ + public String getBkLedgersPath() { + return bkLedgersPath; + } + + /** + * Enable/Disable sanity check txn id. + * + * @param enabled + * flag to enable/disable sanity check txn id. + * @return bk dl config. + */ + public BKDLConfig setSanityCheckTxnID(boolean enabled) { + this.sanityCheckTxnID = enabled; + return this; + } + + /** + * @return flag to sanity check highest txn id. + */ + public boolean getSanityCheckTxnID() { + return sanityCheckTxnID; + } + + /** + * Enable/Disable encode region id. + * + * @param enabled + * flag to enable/disable encoding region id. + * @return bk dl config + */ + public BKDLConfig setEncodeRegionID(boolean enabled) { + this.encodeRegionID = enabled; + return this; + } + + /** + * @return flag to encode region id. + */ + public boolean getEncodeRegionID() { + return encodeRegionID; + } + + /** + * Set the root path of zk based ACL manager. + * + * @param aclRootPath + * root path of zk based ACL manager. + * @return bk dl config + */ + public BKDLConfig setACLRootPath(String aclRootPath) { + this.aclRootPath = aclRootPath; + return this; + } + + /** + * Get the root path of zk based ACL manager. + * + * @return root path of zk based ACL manager. + */ + public String getACLRootPath() { + return aclRootPath; + } + + /** + * Set the value at which ledger sequence number should start for streams that are being + * upgraded and did not have ledger sequence number to start with or for newly created + * streams + * + * @param firstLogSegmentSeqNo first ledger sequence number + * @return bk dl config + */ + public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) { + this.firstLogSegmentSeqNo = firstLogSegmentSeqNo; + return this; + } + + /** + * Get the value at which ledger sequence number should start for streams that are being + * upgraded and did not have ledger sequence number to start with or for newly created + * streams + * + * @return first ledger sequence number + */ + public Long getFirstLogSegmentSeqNo() { + if (null == firstLogSegmentSeqNo) { + return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO; + } + return firstLogSegmentSeqNo; + } + + /** + * Set the namespace to federated <i>isFederatedNamespace</i>. + * + * @param isFederatedNamespace + * is the namespace federated? + * @return bk dl config + */ + public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) { + this.isFederatedNamespace = isFederatedNamespace; + return this; + } + + /** + * Whether the namespace is federated namespace + * + * @return true if the namespace is a federated namespace. otherwise false. + */ + public boolean isFederatedNamespace() { + return this.isFederatedNamespace; + } + + @Override + public int hashCode() { + return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader, + dlZkServersForWriter, dlZkServersForReader, + bkLedgersPath); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof BKDLConfig)) { + return false; + } + BKDLConfig another = (BKDLConfig) o; + return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) && + Objects.equal(bkZkServersForReader, another.bkZkServersForReader) && + Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) && + Objects.equal(dlZkServersForReader, another.dlZkServersForReader) && + Objects.equal(bkLedgersPath, another.bkLedgersPath) && + sanityCheckTxnID == another.sanityCheckTxnID && + encodeRegionID == another.encodeRegionID && + Objects.equal(aclRootPath, another.aclRootPath) && + Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) && + Objects.equal(isFederatedNamespace, another.isFederatedNamespace); + + } + + @Override + public String toString() { + return serialize(); + } + + @Override + public String serialize() { + BKDLConfigFormat configFormat = new BKDLConfigFormat(); + if (null != bkZkServersForWriter) { + configFormat.setBkZkServers(bkZkServersForWriter); + } + if (null != bkZkServersForReader) { + configFormat.setBkZkServersForReader(bkZkServersForReader); + } + if (null != dlZkServersForWriter) { + configFormat.setDlZkServersForWriter(dlZkServersForWriter); + } + if (null != dlZkServersForReader) { + configFormat.setDlZkServersForReader(dlZkServersForReader); + } + if (null != bkLedgersPath) { + configFormat.setBkLedgersPath(bkLedgersPath); + } + configFormat.setSanityCheckTxnID(sanityCheckTxnID); + configFormat.setEncodeRegionID(encodeRegionID); + if (null != aclRootPath) { + configFormat.setAclRootPath(aclRootPath); + } + if (null != firstLogSegmentSeqNo) { + configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo); + } + if (isFederatedNamespace) { + configFormat.setFederatedNamespace(true); + } + return serialize(configFormat); + } + + String serialize(BKDLConfigFormat configFormat) { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + configFormat.write(protocol); + transport.flush(); + return transport.toString("UTF-8"); + } catch (TException e) { + throw new RuntimeException("Failed to serialize BKDLConfig : ", e); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Failed to serialize BKDLConfig : ", e); + } + } + + @Override + public void deserialize(byte[] data) throws IOException { + BKDLConfigFormat configFormat = new BKDLConfigFormat(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + configFormat.read(protocol); + } catch (TException e) { + throw new IOException("Failed to deserialize data '" + + new String(data, UTF_8) + "' : ", e); + } + // bookkeeper cluster settings + if (configFormat.isSetBkZkServers()) { + bkZkServersForWriter = configFormat.getBkZkServers(); + } + if (configFormat.isSetBkZkServersForReader()) { + bkZkServersForReader = configFormat.getBkZkServersForReader(); + } else { + bkZkServersForReader = bkZkServersForWriter; + } + if (configFormat.isSetBkLedgersPath()) { + bkLedgersPath = configFormat.getBkLedgersPath(); + } + // dl zookeeper cluster settings + if (configFormat.isSetDlZkServersForWriter()) { + dlZkServersForWriter = configFormat.getDlZkServersForWriter(); + } + if (configFormat.isSetDlZkServersForReader()) { + dlZkServersForReader = configFormat.getDlZkServersForReader(); + } else { + dlZkServersForReader = dlZkServersForWriter; + } + // dl settings + sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID(); + encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID(); + if (configFormat.isSetAclRootPath()) { + aclRootPath = configFormat.getAclRootPath(); + } + + if (configFormat.isSetFirstLogSegmentSeqNo()) { + firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo(); + } + isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace(); + + // Validate the settings + if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath || + null == dlZkServersForWriter || null == dlZkServersForReader) { + throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java new file mode 100644 index 0000000..6b7a231 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.metadata; + +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.metadata.DLMetadata; +import com.twitter.distributedlog.metadata.MetadataResolver; +import org.apache.commons.lang.StringUtils; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.net.URI; + +public class ZkMetadataResolver implements MetadataResolver { + + private final ZooKeeperClient zkc; + + public ZkMetadataResolver(ZooKeeperClient zkc) { + this.zkc = zkc; + } + + @Override + public DLMetadata resolve(URI uri) throws IOException { + String dlPath = uri.getPath(); + PathUtils.validatePath(dlPath); + // Normal case the dl metadata is stored in the last segment + // so lookup last segment first. + String[] parts = StringUtils.split(dlPath, '/'); + if (null == parts || 0 == parts.length) { + throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath); + } + for (int i = parts.length; i >= 0; i--) { + String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i)); + byte[] data; + try { + data = zkc.get().getData(pathToResolve, false, new Stat()); + } catch (KeeperException.NoNodeException nne) { + continue; + } catch (KeeperException ke) { + throw new IOException("Fail to resolve dl path : " + pathToResolve); + } catch (InterruptedException ie) { + throw new IOException("Interrupted when resolving dl path : " + pathToResolve); + } + if (null == data || data.length == 0) { + continue; + } + try { + return DLMetadata.deserialize(uri, data); + } catch (IOException ie) { + throw new IOException("Failed to deserialize uri : " + uri); + } + } + throw new IOException("No bkdl config bound under dl path : " + dlPath); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java new file mode 100644 index 0000000..b067ee9 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.subscription; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import com.twitter.distributedlog.subscription.SubscriptionStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import com.google.common.base.Charsets; + +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import com.twitter.distributedlog.DLSN; +import com.twitter.distributedlog.util.Utils; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.util.Future; +import com.twitter.util.Promise; + +public class ZKSubscriptionStateStore implements SubscriptionStateStore { + + static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class); + + private final ZooKeeperClient zooKeeperClient; + private final String zkPath; + private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null); + + public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) { + this.zooKeeperClient = zooKeeperClient; + this.zkPath = zkPath; + } + + @Override + public void close() throws IOException { + } + + /** + * Get the last committed position stored for this subscription + */ + @Override + public Future<DLSN> getLastCommitPosition() { + if (null != lastCommittedPosition.get()) { + return Future.value(lastCommittedPosition.get()); + } else { + return getLastCommitPositionFromZK(); + } + } + + Future<DLSN> getLastCommitPositionFromZK() { + final Promise<DLSN> result = new Promise<DLSN>(); + try { + logger.debug("Reading last commit position from path {}", zkPath); + zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc); + if (KeeperException.Code.NONODE.intValue() == rc) { + result.setValue(DLSN.NonInclusiveLowerBound); + } else if (KeeperException.Code.OK.intValue() != rc) { + result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); + } else { + try { + DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8)); + result.setValue(dlsn); + } catch (Exception t) { + logger.warn("Invalid last commit position found from path {}", zkPath, t); + // invalid dlsn recorded in subscription state store + result.setValue(DLSN.NonInclusiveLowerBound); + } + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { + result.setException(zkce); + } catch (InterruptedException ie) { + result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie)); + } + return result; + } + + /** + * Advances the position associated with the subscriber + * + * @param newPosition - new commit position + */ + @Override + public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) { + if (null == lastCommittedPosition.get() || + (newPosition.compareTo(lastCommittedPosition.get()) > 0)) { + lastCommittedPosition.set(newPosition); + return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient, + zkPath, newPosition.serialize().getBytes(Charsets.UTF_8), + zooKeeperClient.getDefaultACL(), + CreateMode.PERSISTENT); + } else { + return Future.Done(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java new file mode 100644 index 0000000..17ba943 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.impl.subscription; + +import com.twitter.distributedlog.DLSN; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.exceptions.DLInterruptedException; +import com.twitter.distributedlog.subscription.SubscriptionStateStore; +import com.twitter.distributedlog.subscription.SubscriptionsStore; +import com.twitter.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.Promise; + +import org.apache.bookkeeper.meta.ZkVersion; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * ZooKeeper Based Subscriptions Store. + */ +public class ZKSubscriptionsStore implements SubscriptionsStore { + + private final ZooKeeperClient zkc; + private final String zkPath; + private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers = + new ConcurrentHashMap<String, ZKSubscriptionStateStore>(); + + public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) { + this.zkc = zkc; + this.zkPath = zkPath; + } + + private ZKSubscriptionStateStore getSubscriber(String subscriberId) { + ZKSubscriptionStateStore ss = subscribers.get(subscriberId); + if (ss == null) { + ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc, + getSubscriberZKPath(subscriberId)); + ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS); + if (oldSS == null) { + ss = newSS; + } else { + try { + newSS.close(); + } catch (IOException e) { + // ignore the exception + } + ss = oldSS; + } + } + return ss; + } + + private String getSubscriberZKPath(String subscriberId) { + return String.format("%s/%s", zkPath, subscriberId); + } + + @Override + public Future<DLSN> getLastCommitPosition(String subscriberId) { + return getSubscriber(subscriberId).getLastCommitPosition(); + } + + @Override + public Future<Map<String, DLSN>> getLastCommitPositions() { + final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>(); + try { + this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { + @Override + public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { + if (KeeperException.Code.NONODE.intValue() == rc) { + result.setValue(new HashMap<String, DLSN>()); + } else if (KeeperException.Code.OK.intValue() != rc) { + result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); + } else { + getLastCommitPositions(result, children); + } + } + }, null); + } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { + result.setException(zkce); + } catch (InterruptedException ie) { + result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie)); + } + return result; + } + + private void getLastCommitPositions(final Promise<Map<String, DLSN>> result, + List<String> subscribers) { + List<Future<Pair<String, DLSN>>> futures = + new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size()); + for (String s : subscribers) { + final String subscriber = s; + Future<Pair<String, DLSN>> future = + // Get the last commit position from zookeeper + getSubscriber(subscriber).getLastCommitPositionFromZK().map( + new AbstractFunction1<DLSN, Pair<String, DLSN>>() { + @Override + public Pair<String, DLSN> apply(DLSN dlsn) { + return Pair.of(subscriber, dlsn); + } + }); + futures.add(future); + } + Future.collect(futures).foreach( + new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() { + @Override + public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) { + Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>(); + for (Pair<String, DLSN> pair : subscriptions) { + subscriptionMap.put(pair.getLeft(), pair.getRight()); + } + result.setValue(subscriptionMap); + return BoxedUnit.UNIT; + } + }); + } + + @Override + public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) { + return getSubscriber(subscriberId).advanceCommitPosition(newPosition); + } + + @Override + public Future<Boolean> deleteSubscriber(String subscriberId) { + subscribers.remove(subscriberId); + String path = getSubscriberZKPath(subscriberId); + return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1)); + } + + @Override + public void close() throws IOException { + // no-op + for (SubscriptionStateStore store : subscribers.values()) { + store.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/cfc049cd/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java index 07387cb..81eb5ed 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/logsegment/LogSegmentEntryReader.java @@ -32,6 +32,15 @@ import java.util.List; @Beta public interface LogSegmentEntryReader extends AsyncCloseable { + interface StateChangeListener { + + /** + * Notify when caught up on inprogress. + */ + void onCaughtupOnInprogress(); + + } + /** * Start the reader. The method to signal the implementation * to start preparing the data for consumption {@link #readNext(int)} @@ -39,6 +48,22 @@ public interface LogSegmentEntryReader extends AsyncCloseable { void start(); /** + * Register the state change listener + * + * @param listener register the state change listener + * @return entry reader + */ + LogSegmentEntryReader registerListener(StateChangeListener listener); + + /** + * Unregister the state change listener + * + * @param listener register the state change listener + * @return entry reader + */ + LogSegmentEntryReader unregisterListener(StateChangeListener listener); + + /** * Return the log segment metadata for this reader. * * @return the log segment metadata