http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java deleted file mode 100644 index 91e6dec..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.logsegment; - -import com.twitter.distributedlog.BookKeeperClient; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.bk.DynamicQuorumConfigProvider; -import com.twitter.distributedlog.bk.LedgerAllocator; -import com.twitter.distributedlog.bk.LedgerAllocatorDelegator; -import com.twitter.distributedlog.bk.QuorumConfigProvider; -import com.twitter.distributedlog.bk.SimpleLedgerAllocator; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.BKTransmitException; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.logsegment.LogSegmentEntryReader; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; -import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.util.Allocator; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * BookKeeper Based Entry Store - */ -public class BKLogSegmentEntryStore implements - LogSegmentEntryStore, - AsyncCallback.OpenCallback, - AsyncCallback.DeleteCallback { - - private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class); - - private static class OpenReaderRequest { - - private final LogSegmentMetadata segment; - private final long startEntryId; - private final Promise<LogSegmentEntryReader> openPromise; - - OpenReaderRequest(LogSegmentMetadata segment, - long startEntryId) { - this.segment = segment; - this.startEntryId = startEntryId; - this.openPromise = new Promise<LogSegmentEntryReader>(); - } - - } - - private static class DeleteLogSegmentRequest { - - private final LogSegmentMetadata segment; - private final Promise<LogSegmentMetadata> deletePromise; - - DeleteLogSegmentRequest(LogSegmentMetadata segment) { - this.segment = segment; - this.deletePromise = new Promise<LogSegmentMetadata>(); - } - - } - - private final byte[] passwd; - private final ZooKeeperClient zkc; - private final BookKeeperClient bkc; - private final OrderedScheduler scheduler; - private final DistributedLogConfiguration conf; - private final DynamicDistributedLogConfiguration dynConf; - private final StatsLogger statsLogger; - private final AsyncFailureInjector failureInjector; - // ledger allocator - private final LedgerAllocator allocator; - - public BKLogSegmentEntryStore(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - ZooKeeperClient zkc, - BookKeeperClient bkc, - OrderedScheduler scheduler, - LedgerAllocator allocator, - StatsLogger statsLogger, - AsyncFailureInjector failureInjector) { - this.conf = conf; - this.dynConf = dynConf; - this.zkc = zkc; - this.bkc = bkc; - this.passwd = conf.getBKDigestPW().getBytes(UTF_8); - this.scheduler = scheduler; - this.allocator = allocator; - this.statsLogger = statsLogger; - this.failureInjector = failureInjector; - } - - @Override - public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) { - DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment); - BookKeeper bk; - try { - bk = this.bkc.get(); - } catch (IOException e) { - return Future.exception(e); - } - bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request); - return request.deletePromise; - } - - @Override - public void deleteComplete(int rc, Object ctx) { - DeleteLogSegmentRequest deleteRequest = (DeleteLogSegmentRequest) ctx; - if (BKException.Code.NoSuchLedgerExistsException == rc) { - logger.warn("No ledger {} found to delete for {}.", - deleteRequest.segment.getLogSegmentId(), deleteRequest.segment); - } else if (BKException.Code.OK != rc) { - logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}", - new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment, - BKException.getMessage(rc) }); - FutureUtils.setException(deleteRequest.deletePromise, - new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc)); - return; - } - FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment); - } - - // - // Writers - // - - LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata, - DynamicDistributedLogConfiguration dynConf) - throws IOException { - LedgerAllocator ledgerAllocatorDelegator; - if (null == allocator || !dynConf.getEnableLedgerAllocatorPool()) { - QuorumConfigProvider quorumConfigProvider = - new DynamicQuorumConfigProvider(dynConf); - LedgerAllocator allocator = new SimpleLedgerAllocator( - logMetadata.getAllocationPath(), - logMetadata.getAllocationData(), - quorumConfigProvider, - zkc, - bkc); - ledgerAllocatorDelegator = new LedgerAllocatorDelegator(allocator, true); - } else { - ledgerAllocatorDelegator = allocator; - } - return ledgerAllocatorDelegator; - } - - @Override - public Allocator<LogSegmentEntryWriter, Object> newLogSegmentAllocator( - LogMetadataForWriter logMetadata, - DynamicDistributedLogConfiguration dynConf) throws IOException { - // Build the ledger allocator - LedgerAllocator allocator = createLedgerAllocator(logMetadata, dynConf); - return new BKLogSegmentAllocator(allocator); - } - - // - // Readers - // - - @Override - public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment, - long startEntryId) { - BookKeeper bk; - try { - bk = this.bkc.get(); - } catch (IOException e) { - return Future.exception(e); - } - OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId); - if (segment.isInProgress()) { - bk.asyncOpenLedgerNoRecovery( - segment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, - passwd, - this, - request); - } else { - bk.asyncOpenLedger( - segment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, - passwd, - this, - request); - } - return request.openPromise; - } - - @Override - public void openComplete(int rc, LedgerHandle lh, Object ctx) { - OpenReaderRequest request = (OpenReaderRequest) ctx; - if (BKException.Code.OK != rc) { - FutureUtils.setException( - request.openPromise, - new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc)); - return; - } - // successfully open a ledger - try { - LogSegmentEntryReader reader = new BKLogSegmentEntryReader( - request.segment, - lh, - request.startEntryId, - bkc.get(), - scheduler, - conf, - statsLogger, - failureInjector); - FutureUtils.setValue(request.openPromise, reader); - } catch (IOException e) { - FutureUtils.setException(request.openPromise, e); - } - - } - - @Override - public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment, - final boolean fence) { - final BookKeeper bk; - try { - bk = this.bkc.get(); - } catch (IOException e) { - return Future.exception(e); - } - final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>(); - AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() { - @Override - public void openComplete(int rc, LedgerHandle lh, Object ctx) { - if (BKException.Code.OK != rc) { - FutureUtils.setException( - openPromise, - new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc)); - return; - } - LogSegmentRandomAccessEntryReader reader = new BKLogSegmentRandomAccessEntryReader( - segment, - lh, - conf); - FutureUtils.setValue(openPromise, reader); - } - }; - if (segment.isInProgress() && !fence) { - bk.asyncOpenLedgerNoRecovery( - segment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, - passwd, - openCallback, - null); - } else { - bk.asyncOpenLedger( - segment.getLogSegmentId(), - BookKeeper.DigestType.CRC32, - passwd, - openCallback, - null); - } - return openPromise; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java deleted file mode 100644 index 34fe1c3..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentEntryWriter.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.logsegment; - -import com.google.common.annotations.VisibleForTesting; -import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.LedgerHandle; - -/** - * Ledger based log segment entry writer. - */ -public class BKLogSegmentEntryWriter implements LogSegmentEntryWriter { - - private final LedgerHandle lh; - - public BKLogSegmentEntryWriter(LedgerHandle lh) { - this.lh = lh; - } - - @VisibleForTesting - public LedgerHandle getLedgerHandle() { - return this.lh; - } - - @Override - public long getLogSegmentId() { - return lh.getId(); - } - - @Override - public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) { - lh.asyncClose(callback, ctx); - } - - @Override - public void asyncAddEntry(byte[] data, int offset, int length, - AsyncCallback.AddCallback callback, Object ctx) { - lh.asyncAddEntry(data, offset, length, callback, ctx); - } - - @Override - public long size() { - return lh.getLength(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java deleted file mode 100644 index 9cec80c..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.logsegment; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.Entry; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.exceptions.BKTransmitException; -import com.twitter.distributedlog.logsegment.LogSegmentRandomAccessEntryReader; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; - -import java.io.IOException; -import java.util.Enumeration; -import java.util.List; - -/** - * BookKeeper ledger based random access entry reader. - */ -class BKLogSegmentRandomAccessEntryReader implements - LogSegmentRandomAccessEntryReader, - ReadCallback { - - private final long lssn; - private final long startSequenceId; - private final boolean envelopeEntries; - private final boolean deserializeRecordSet; - // state - private final LogSegmentMetadata metadata; - private final LedgerHandle lh; - private Promise<Void> closePromise = null; - - BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata, - LedgerHandle lh, - DistributedLogConfiguration conf) { - this.metadata = metadata; - this.lssn = metadata.getLogSegmentSequenceNumber(); - this.startSequenceId = metadata.getStartSequenceId(); - this.envelopeEntries = metadata.getEnvelopeEntries(); - this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads(); - this.lh = lh; - } - - @Override - public long getLastAddConfirmed() { - return lh.getLastAddConfirmed(); - } - - @Override - public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) { - Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>(); - lh.asyncReadEntries(startEntryId, endEntryId, this, promise); - return promise; - } - - Entry.Reader processReadEntry(LedgerEntry entry) throws IOException { - return Entry.newBuilder() - .setLogSegmentInfo(lssn, startSequenceId) - .setEntryId(entry.getEntryId()) - .setEnvelopeEntry(envelopeEntries) - .deserializeRecordSet(deserializeRecordSet) - .setInputStream(entry.getEntryInputStream()) - .buildReader(); - } - - @Override - public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { - Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx; - if (BKException.Code.OK == rc) { - List<Entry.Reader> entryList = Lists.newArrayList(); - while (entries.hasMoreElements()) { - try { - entryList.add(processReadEntry(entries.nextElement())); - } catch (IOException ioe) { - FutureUtils.setException(promise, ioe); - return; - } - } - FutureUtils.setValue(promise, entryList); - } else { - FutureUtils.setException(promise, - new BKTransmitException("Failed to read entries :", rc)); - } - } - - @Override - public Future<Void> asyncClose() { - final Promise<Void> closeFuture; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - } - BKUtils.closeLedgers(lh).proxyTo(closeFuture); - return closeFuture; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java deleted file mode 100644 index c71c67e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/logsegment/BKUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.logsegment; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.function.VoidFunctions; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import com.twitter.util.Futures; -import com.twitter.util.Promise; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerHandle; - -import java.util.List; - -/** - * BookKeeper Util Functions - */ -public class BKUtils { - - /** - * Close a ledger <i>lh</i>. - * - * @param lh ledger handle - * @return future represents close result. - */ - public static Future<Void> closeLedger(LedgerHandle lh) { - final Promise<Void> closePromise = new Promise<Void>(); - lh.asyncClose(new AsyncCallback.CloseCallback() { - @Override - public void closeComplete(int rc, LedgerHandle lh, Object ctx) { - if (BKException.Code.OK != rc) { - FutureUtils.setException(closePromise, BKException.create(rc)); - } else { - FutureUtils.setValue(closePromise, null); - } - } - }, null); - return closePromise; - } - - /** - * Close a list of ledgers <i>lhs</i>. - * - * @param lhs a list of ledgers - * @return future represents close results. - */ - public static Future<Void> closeLedgers(LedgerHandle ... lhs) { - List<Future<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length); - for (LedgerHandle lh : lhs) { - closeResults.add(closeLedger(lh)); - } - return Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java deleted file mode 100644 index 3e859fb..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/BKDLConfig.java +++ /dev/null @@ -1,400 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.impl.BKNamespaceDriver; -import com.twitter.distributedlog.metadata.DLConfig; -import com.twitter.distributedlog.thrift.BKDLConfigFormat; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TJSONProtocol; -import org.apache.thrift.transport.TMemoryBuffer; -import org.apache.thrift.transport.TMemoryInputTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * Configurations for BookKeeper based DL. - */ -public class BKDLConfig implements DLConfig { - - private static final Logger LOG = LoggerFactory.getLogger(BKDLConfig.class); - - private static final int BUFFER_SIZE = 4096; - private static final ConcurrentMap<URI, DLConfig> cachedDLConfigs = - new ConcurrentHashMap<URI, DLConfig>(); - - public static void propagateConfiguration(BKDLConfig bkdlConfig, DistributedLogConfiguration dlConf) { - dlConf.setEncodeRegionIDInLogSegmentMetadata(bkdlConfig.getEncodeRegionID()); - dlConf.setFirstLogSegmentSequenceNumber(bkdlConfig.getFirstLogSegmentSeqNo()); - if (bkdlConfig.isFederatedNamespace()) { - dlConf.setCreateStreamIfNotExists(false); - LOG.info("Disabled createIfNotExists for federated namespace."); - } - LOG.info("Propagate BKDLConfig to DLConfig : encodeRegionID = {}," + - " firstLogSegmentSequenceNumber = {}, createStreamIfNotExists = {}, isFederated = {}.", - new Object[] { dlConf.getEncodeRegionIDInLogSegmentMetadata(), - dlConf.getFirstLogSegmentSequenceNumber(), dlConf.getCreateStreamIfNotExists(), - bkdlConfig.isFederatedNamespace() }); - } - - public static BKDLConfig resolveDLConfig(ZooKeeperClient zkc, URI uri) throws IOException { - DLConfig dlConfig = cachedDLConfigs.get(uri); - if (dlConfig == null) { - dlConfig = (new ZkMetadataResolver(zkc).resolve(uri)).getDLConfig(); - DLConfig oldDLConfig = cachedDLConfigs.putIfAbsent(uri, dlConfig); - if (null != oldDLConfig) { - dlConfig = oldDLConfig; - } - } - assert (dlConfig instanceof BKDLConfig); - return (BKDLConfig)dlConfig; - } - - @VisibleForTesting - public static void clearCachedDLConfigs() { - cachedDLConfigs.clear(); - } - - private String bkZkServersForWriter; - private String bkZkServersForReader; - private String bkLedgersPath; - private boolean sanityCheckTxnID = true; - private boolean encodeRegionID = false; - private String dlZkServersForWriter; - private String dlZkServersForReader; - private String aclRootPath; - private Long firstLogSegmentSeqNo; - private boolean isFederatedNamespace = false; - - /** - * Construct a empty config with given <i>uri</i>. - */ - public BKDLConfig(URI uri) { - this(BKNamespaceDriver.getZKServersFromDLUri(uri), - BKNamespaceDriver.getZKServersFromDLUri(uri), - null, null, null); - } - - /** - * The caller should make sure both dl and bk use same zookeeper server. - * - * @param zkServers - * zk servers used for both dl and bk. - * @param ledgersPath - * ledgers path. - */ - @VisibleForTesting - public BKDLConfig(String zkServers, String ledgersPath) { - this(zkServers, zkServers, zkServers, zkServers, ledgersPath); - } - - public BKDLConfig(String dlZkServersForWriter, - String dlZkServersForReader, - String bkZkServersForWriter, - String bkZkServersForReader, - String bkLedgersPath) { - this.dlZkServersForWriter = dlZkServersForWriter; - this.dlZkServersForReader = dlZkServersForReader; - this.bkZkServersForWriter = bkZkServersForWriter; - this.bkZkServersForReader = bkZkServersForReader; - this.bkLedgersPath = bkLedgersPath; - } - - /** - * @return zk servers used for bk for writers - */ - public String getBkZkServersForWriter() { - return bkZkServersForWriter; - } - - /** - * @return zk servers used for bk for readers - */ - public String getBkZkServersForReader() { - return bkZkServersForReader; - } - - /** - * @return zk servers used for dl for writers - */ - public String getDlZkServersForWriter() { - return dlZkServersForWriter; - } - - /** - * @return zk servers used for dl for readers - */ - public String getDlZkServersForReader() { - return dlZkServersForReader; - } - - /** - * @return ledgers path for bk - */ - public String getBkLedgersPath() { - return bkLedgersPath; - } - - /** - * Enable/Disable sanity check txn id. - * - * @param enabled - * flag to enable/disable sanity check txn id. - * @return bk dl config. - */ - public BKDLConfig setSanityCheckTxnID(boolean enabled) { - this.sanityCheckTxnID = enabled; - return this; - } - - /** - * @return flag to sanity check highest txn id. - */ - public boolean getSanityCheckTxnID() { - return sanityCheckTxnID; - } - - /** - * Enable/Disable encode region id. - * - * @param enabled - * flag to enable/disable encoding region id. - * @return bk dl config - */ - public BKDLConfig setEncodeRegionID(boolean enabled) { - this.encodeRegionID = enabled; - return this; - } - - /** - * @return flag to encode region id. - */ - public boolean getEncodeRegionID() { - return encodeRegionID; - } - - /** - * Set the root path of zk based ACL manager. - * - * @param aclRootPath - * root path of zk based ACL manager. - * @return bk dl config - */ - public BKDLConfig setACLRootPath(String aclRootPath) { - this.aclRootPath = aclRootPath; - return this; - } - - /** - * Get the root path of zk based ACL manager. - * - * @return root path of zk based ACL manager. - */ - public String getACLRootPath() { - return aclRootPath; - } - - /** - * Set the value at which ledger sequence number should start for streams that are being - * upgraded and did not have ledger sequence number to start with or for newly created - * streams - * - * @param firstLogSegmentSeqNo first ledger sequence number - * @return bk dl config - */ - public BKDLConfig setFirstLogSegmentSeqNo(long firstLogSegmentSeqNo) { - this.firstLogSegmentSeqNo = firstLogSegmentSeqNo; - return this; - } - - /** - * Get the value at which ledger sequence number should start for streams that are being - * upgraded and did not have ledger sequence number to start with or for newly created - * streams - * - * @return first ledger sequence number - */ - public Long getFirstLogSegmentSeqNo() { - if (null == firstLogSegmentSeqNo) { - return DistributedLogConstants.FIRST_LOGSEGMENT_SEQNO; - } - return firstLogSegmentSeqNo; - } - - /** - * Set the namespace to federated <i>isFederatedNamespace</i>. - * - * @param isFederatedNamespace - * is the namespace federated? - * @return bk dl config - */ - public BKDLConfig setFederatedNamespace(boolean isFederatedNamespace) { - this.isFederatedNamespace = isFederatedNamespace; - return this; - } - - /** - * Whether the namespace is federated namespace - * - * @return true if the namespace is a federated namespace. otherwise false. - */ - public boolean isFederatedNamespace() { - return this.isFederatedNamespace; - } - - @Override - public int hashCode() { - return Objects.hashCode(bkZkServersForWriter, bkZkServersForReader, - dlZkServersForWriter, dlZkServersForReader, - bkLedgersPath); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof BKDLConfig)) { - return false; - } - BKDLConfig another = (BKDLConfig) o; - return Objects.equal(bkZkServersForWriter, another.bkZkServersForWriter) && - Objects.equal(bkZkServersForReader, another.bkZkServersForReader) && - Objects.equal(dlZkServersForWriter, another.dlZkServersForWriter) && - Objects.equal(dlZkServersForReader, another.dlZkServersForReader) && - Objects.equal(bkLedgersPath, another.bkLedgersPath) && - sanityCheckTxnID == another.sanityCheckTxnID && - encodeRegionID == another.encodeRegionID && - Objects.equal(aclRootPath, another.aclRootPath) && - Objects.equal(firstLogSegmentSeqNo, another.firstLogSegmentSeqNo) && - Objects.equal(isFederatedNamespace, another.isFederatedNamespace); - - } - - @Override - public String toString() { - return serialize(); - } - - @Override - public String serialize() { - BKDLConfigFormat configFormat = new BKDLConfigFormat(); - if (null != bkZkServersForWriter) { - configFormat.setBkZkServers(bkZkServersForWriter); - } - if (null != bkZkServersForReader) { - configFormat.setBkZkServersForReader(bkZkServersForReader); - } - if (null != dlZkServersForWriter) { - configFormat.setDlZkServersForWriter(dlZkServersForWriter); - } - if (null != dlZkServersForReader) { - configFormat.setDlZkServersForReader(dlZkServersForReader); - } - if (null != bkLedgersPath) { - configFormat.setBkLedgersPath(bkLedgersPath); - } - configFormat.setSanityCheckTxnID(sanityCheckTxnID); - configFormat.setEncodeRegionID(encodeRegionID); - if (null != aclRootPath) { - configFormat.setAclRootPath(aclRootPath); - } - if (null != firstLogSegmentSeqNo) { - configFormat.setFirstLogSegmentSeqNo(firstLogSegmentSeqNo); - } - if (isFederatedNamespace) { - configFormat.setFederatedNamespace(true); - } - return serialize(configFormat); - } - - String serialize(BKDLConfigFormat configFormat) { - TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - configFormat.write(protocol); - transport.flush(); - return transport.toString("UTF-8"); - } catch (TException e) { - throw new RuntimeException("Failed to serialize BKDLConfig : ", e); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Failed to serialize BKDLConfig : ", e); - } - } - - @Override - public void deserialize(byte[] data) throws IOException { - BKDLConfigFormat configFormat = new BKDLConfigFormat(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TJSONProtocol protocol = new TJSONProtocol(transport); - try { - configFormat.read(protocol); - } catch (TException e) { - throw new IOException("Failed to deserialize data '" + - new String(data, UTF_8) + "' : ", e); - } - // bookkeeper cluster settings - if (configFormat.isSetBkZkServers()) { - bkZkServersForWriter = configFormat.getBkZkServers(); - } - if (configFormat.isSetBkZkServersForReader()) { - bkZkServersForReader = configFormat.getBkZkServersForReader(); - } else { - bkZkServersForReader = bkZkServersForWriter; - } - if (configFormat.isSetBkLedgersPath()) { - bkLedgersPath = configFormat.getBkLedgersPath(); - } - // dl zookeeper cluster settings - if (configFormat.isSetDlZkServersForWriter()) { - dlZkServersForWriter = configFormat.getDlZkServersForWriter(); - } - if (configFormat.isSetDlZkServersForReader()) { - dlZkServersForReader = configFormat.getDlZkServersForReader(); - } else { - dlZkServersForReader = dlZkServersForWriter; - } - // dl settings - sanityCheckTxnID = !configFormat.isSetSanityCheckTxnID() || configFormat.isSanityCheckTxnID(); - encodeRegionID = configFormat.isSetEncodeRegionID() && configFormat.isEncodeRegionID(); - if (configFormat.isSetAclRootPath()) { - aclRootPath = configFormat.getAclRootPath(); - } - - if (configFormat.isSetFirstLogSegmentSeqNo()) { - firstLogSegmentSeqNo = configFormat.getFirstLogSegmentSeqNo(); - } - isFederatedNamespace = configFormat.isSetFederatedNamespace() && configFormat.isFederatedNamespace(); - - // Validate the settings - if (null == bkZkServersForWriter || null == bkZkServersForReader || null == bkLedgersPath || - null == dlZkServersForWriter || null == dlZkServersForReader) { - throw new IOException("Missing zk/bk settings in BKDL Config : " + new String(data, UTF_8)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java deleted file mode 100644 index c76a5a5..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java +++ /dev/null @@ -1,633 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.exceptions.LockCancelledException; -import com.twitter.distributedlog.exceptions.LogExistsException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; -import com.twitter.distributedlog.lock.DistributedLock; -import com.twitter.distributedlog.lock.SessionLockFactory; -import com.twitter.distributedlog.lock.ZKDistributedLock; -import com.twitter.distributedlog.lock.ZKSessionLockFactory; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.metadata.LogMetadata; -import com.twitter.distributedlog.metadata.LogMetadataForReader; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.distributedlog.zk.LimitedPermitManager; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.PermitManager; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.zk.ZKTransaction; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static com.twitter.distributedlog.metadata.LogMetadata.*; - -/** - * zookeeper based {@link LogStreamMetadataStore} - */ -public class ZKLogStreamMetadataStore implements LogStreamMetadataStore { - - private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class); - - private final String clientId; - private final DistributedLogConfiguration conf; - private final ZooKeeperClient zooKeeperClient; - private final OrderedScheduler scheduler; - private final StatsLogger statsLogger; - private final LogSegmentMetadataStore logSegmentStore; - private final LimitedPermitManager permitManager; - // lock - private SessionLockFactory lockFactory; - private OrderedScheduler lockStateExecutor; - - public ZKLogStreamMetadataStore(String clientId, - DistributedLogConfiguration conf, - ZooKeeperClient zkc, - OrderedScheduler scheduler, - StatsLogger statsLogger) { - this.clientId = clientId; - this.conf = conf; - this.zooKeeperClient = zkc; - this.scheduler = scheduler; - this.statsLogger = statsLogger; - // create the log segment metadata store and the permit manager (used for log segment rolling) - this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler); - this.permitManager = new LimitedPermitManager( - conf.getLogSegmentRollingConcurrency(), - 1, - TimeUnit.MINUTES, - scheduler); - this.zooKeeperClient.register(permitManager); - } - - private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) { - if (createIfNull && null == lockStateExecutor) { - StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler"); - lockStateExecutor = OrderedScheduler.newBuilder() - .name("DLM-LockState") - .corePoolSize(conf.getNumLockStateThreads()) - .statsLogger(lockStateStatsLogger) - .perExecutorStatsLogger(lockStateStatsLogger) - .traceTaskExecution(conf.getEnableTaskExecutionStats()) - .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros()) - .build(); - } - return lockStateExecutor; - } - - private synchronized SessionLockFactory getLockFactory(boolean createIfNull) { - if (createIfNull && null == lockFactory) { - lockFactory = new ZKSessionLockFactory( - zooKeeperClient, - clientId, - getLockStateExecutor(createIfNull), - conf.getZKNumRetries(), - conf.getLockTimeoutMilliSeconds(), - conf.getZKRetryBackoffStartMillis(), - statsLogger); - } - return lockFactory; - } - - @Override - public void close() throws IOException { - this.zooKeeperClient.unregister(permitManager); - this.permitManager.close(); - this.logSegmentStore.close(); - SchedulerUtils.shutdownScheduler( - getLockStateExecutor(false), - conf.getSchedulerShutdownTimeoutMs(), - TimeUnit.MILLISECONDS); - } - - @Override - public LogSegmentMetadataStore getLogSegmentMetadataStore() { - return logSegmentStore; - } - - @Override - public PermitManager getPermitManager() { - return this.permitManager; - } - - @Override - public Transaction<Object> newTransaction() { - return new ZKTransaction(zooKeeperClient); - } - - @Override - public Future<Void> logExists(URI uri, final String logName) { - final String logSegmentsPath = LogMetadata.getLogSegmentsPath( - uri, logName, conf.getUnpartitionedStreamName()); - final Promise<Void> promise = new Promise<Void>(); - try { - final ZooKeeper zk = zooKeeperClient.get(); - zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int syncRc, String path, Object syncCtx) { - if (KeeperException.Code.NONODE.intValue() == syncRc) { - promise.setException(new LogNotFoundException( - String.format("Log %s does not exist or has been deleted", logName))); - return; - } else if (KeeperException.Code.OK.intValue() != syncRc){ - promise.setException(new ZKException("Error on checking log existence for " + logName, - KeeperException.create(KeeperException.Code.get(syncRc)))); - return; - } - zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() { - @Override - public void processResult(int rc, String path, Object ctx, Stat stat) { - if (KeeperException.Code.OK.intValue() == rc) { - promise.setValue(null); - } else if (KeeperException.Code.NONODE.intValue() == rc) { - promise.setException(new LogNotFoundException( - String.format("Log %s does not exist or has been deleted", logName))); - } else { - promise.setException(new ZKException("Error on checking log existence for " + logName, - KeeperException.create(KeeperException.Code.get(rc)))); - } - } - }, null); - } - }, null); - - } catch (InterruptedException ie) { - LOG.error("Interrupted while reading {}", logSegmentsPath, ie); - promise.setException(new DLInterruptedException("Interrupted while checking " - + logSegmentsPath, ie)); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - promise.setException(e); - } - return promise; - } - - // - // Create Write Lock - // - - @Override - public DistributedLock createWriteLock(LogMetadataForWriter metadata) { - return new ZKDistributedLock( - getLockStateExecutor(true), - getLockFactory(true), - metadata.getLockPath(), - conf.getLockTimeoutMilliSeconds(), - statsLogger); - } - - // - // Create Read Lock - // - - private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata, - final String readLockPath) { - final Promise<Void> promise = new Promise<Void>(); - promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - FutureUtils.setException(promise, new LockCancelledException(readLockPath, - "Could not ensure read lock path", t)); - return null; - } - }); - Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath()); - Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate, - new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT, - new org.apache.zookeeper.AsyncCallback.StringCallback() { - @Override - public void processResult(final int rc, final String path, Object ctx, String name) { - if (KeeperException.Code.NONODE.intValue() == rc) { - FutureUtils.setException(promise, new LogNotFoundException( - String.format("Log %s does not exist or has been deleted", - logMetadata.getFullyQualifiedName()))); - } else if (KeeperException.Code.OK.intValue() == rc) { - FutureUtils.setValue(promise, null); - LOG.trace("Created path {}.", path); - } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - FutureUtils.setValue(promise, null); - LOG.trace("Path {} is already existed.", path); - } else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) { - FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path)); - } else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) { - FutureUtils.setException(promise, new DLInterruptedException(path)); - } else { - FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - return promise; - } - - @Override - public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata, - Optional<String> readerId) { - final String readLockPath = metadata.getReadLockPath(readerId); - return ensureReadLockPathExist(metadata, readLockPath).flatMap( - new ExceptionalFunction<Void, Future<DistributedLock>>() { - @Override - public Future<DistributedLock> applyE(Void value) throws Throwable { - // Unfortunately this has a blocking call which we should not execute on the - // ZK completion thread - return scheduler.apply(new ExceptionalFunction0<DistributedLock>() { - @Override - public DistributedLock applyE() throws Throwable { - return new ZKDistributedLock( - getLockStateExecutor(true), - getLockFactory(true), - readLockPath, - conf.getLockTimeoutMilliSeconds(), - statsLogger.scope("read_lock")); - } - }); - } - }); - } - - // - // Create Log - // - - static class MetadataIndex { - static final int LOG_ROOT_PARENT = 0; - static final int LOG_ROOT = 1; - static final int MAX_TXID = 2; - static final int VERSION = 3; - static final int LOCK = 4; - static final int READ_LOCK = 5; - static final int LOGSEGMENTS = 6; - static final int ALLOCATION = 7; - } - - static int bytesToInt(byte[] b) { - assert b.length >= 4; - return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]; - } - - static byte[] intToBytes(int i) { - return new byte[]{ - (byte) (i >> 24), - (byte) (i >> 16), - (byte) (i >> 8), - (byte) (i)}; - } - - static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk, - String logRootPath, - boolean ownAllocator) { - // Note re. persistent lock state initialization: the read lock persistent state (path) is - // initialized here but only used in the read handler. The reason is its more convenient and - // less error prone to manage all stream structure in one place. - final String logRootParentPath = new File(logRootPath).getParent(); - final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH; - final String maxTxIdPath = logRootPath + MAX_TXID_PATH; - final String lockPath = logRootPath + LOCK_PATH; - final String readLockPath = logRootPath + READ_LOCK_PATH; - final String versionPath = logRootPath + VERSION_PATH; - final String allocationPath = logRootPath + ALLOCATION_PATH; - - int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1; - List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths); - checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false)); - checkFutures.add(Utils.zkGetData(zk, logRootPath, false)); - checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false)); - checkFutures.add(Utils.zkGetData(zk, versionPath, false)); - checkFutures.add(Utils.zkGetData(zk, lockPath, false)); - checkFutures.add(Utils.zkGetData(zk, readLockPath, false)); - checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false)); - if (ownAllocator) { - checkFutures.add(Utils.zkGetData(zk, allocationPath, false)); - } - - return Future.collect(checkFutures); - } - - static boolean pathExists(Versioned<byte[]> metadata) { - return null != metadata.getValue() && null != metadata.getVersion(); - } - - static void ensureMetadataExist(Versioned<byte[]> metadata) { - Preconditions.checkNotNull(metadata.getValue()); - Preconditions.checkNotNull(metadata.getVersion()); - } - - static void createMissingMetadata(final ZooKeeper zk, - final String logRootPath, - final List<Versioned<byte[]>> metadatas, - final List<ACL> acl, - final boolean ownAllocator, - final boolean createIfNotExists, - final Promise<List<Versioned<byte[]>>> promise) { - final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size()); - final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size()); - CreateMode createMode = CreateMode.PERSISTENT; - - // log root parent path - if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) { - pathsToCreate.add(null); - } else { - String logRootParentPath = new File(logRootPath).getParent(); - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - - // log root path - if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - - // max id - if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) { - pathsToCreate.add(null); - } else { - byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L); - pathsToCreate.add(zeroTxnIdData); - zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode)); - } - // version - if (pathExists(metadatas.get(MetadataIndex.VERSION))) { - pathsToCreate.add(null); - } else { - byte[] versionData = intToBytes(LAYOUT_VERSION); - pathsToCreate.add(versionData); - zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode)); - } - // lock path - if (pathExists(metadatas.get(MetadataIndex.LOCK))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - // read lock path - if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - // log segments path - if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) { - pathsToCreate.add(null); - } else { - byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber( - DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO); - pathsToCreate.add(logSegmentsData); - zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode)); - } - // allocation path - if (ownAllocator) { - if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) { - pathsToCreate.add(null); - } else { - pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES); - zkOps.add(Op.create(logRootPath + ALLOCATION_PATH, - DistributedLogConstants.EMPTY_BYTES, acl, createMode)); - } - } - if (zkOps.isEmpty()) { - // nothing missed - promise.setValue(metadatas); - return; - } - if (!createIfNotExists) { - promise.setException(new LogNotFoundException("Log " + logRootPath + " not found")); - return; - } - - zk.multi(zkOps, new AsyncCallback.MultiCallback() { - @Override - public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) { - if (KeeperException.Code.OK.intValue() == rc) { - List<Versioned<byte[]>> finalMetadatas = - Lists.newArrayListWithExpectedSize(metadatas.size()); - for (int i = 0; i < pathsToCreate.size(); i++) { - byte[] dataCreated = pathsToCreate.get(i); - if (null == dataCreated) { - finalMetadatas.add(metadatas.get(i)); - } else { - finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0))); - } - } - promise.setValue(finalMetadatas); - } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - promise.setException(new LogExistsException("Someone just created log " - + logRootPath)); - } else { - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - for (OpResult result : resultList) { - if (result instanceof OpResult.ErrorResult) { - OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result; - builder.append(errorResult.getErr()).append(","); - } else { - builder.append(0).append(","); - } - } - String resultCodeList = builder.substring(0, builder.length() - 1); - LOG.debug("Failed to create log, full rc list = {}", resultCodeList); - } - - promise.setException(new ZKException("Failed to create log " + logRootPath, - KeeperException.Code.get(rc))); - } - } - }, null); - } - - static LogMetadataForWriter processLogMetadatas(URI uri, - String logName, - String logIdentifier, - List<Versioned<byte[]>> metadatas, - boolean ownAllocator) - throws UnexpectedException { - try { - // max id - Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID); - ensureMetadataExist(maxTxnIdData); - // version - Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION); - ensureMetadataExist(maxTxnIdData); - Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue())); - // lock path - ensureMetadataExist(metadatas.get(MetadataIndex.LOCK)); - // read lock path - ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK)); - // max lssn - Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS); - ensureMetadataExist(maxLSSNData); - try { - DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue()); - } catch (NumberFormatException nfe) { - throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe); - } - // allocation path - Versioned<byte[]> allocationData; - if (ownAllocator) { - allocationData = metadatas.get(MetadataIndex.ALLOCATION); - ensureMetadataExist(allocationData); - } else { - allocationData = new Versioned<byte[]>(null, null); - } - return new LogMetadataForWriter(uri, logName, logIdentifier, - maxLSSNData, maxTxnIdData, allocationData); - } catch (IllegalArgumentException iae) { - throw new UnexpectedException("Invalid log " + logName, iae); - } catch (NullPointerException npe) { - throw new UnexpectedException("Invalid log " + logName, npe); - } - } - - static Future<LogMetadataForWriter> getLog(final URI uri, - final String logName, - final String logIdentifier, - final ZooKeeperClient zooKeeperClient, - final boolean ownAllocator, - final boolean createIfNotExists) { - final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier); - try { - PathUtils.validatePath(logRootPath); - } catch (IllegalArgumentException e) { - LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e}); - return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid")); - } - - try { - final ZooKeeper zk = zooKeeperClient.get(); - return checkLogMetadataPaths(zk, logRootPath, ownAllocator) - .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() { - @Override - public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) { - Promise<List<Versioned<byte[]>>> promise = - new Promise<List<Versioned<byte[]>>>(); - createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(), - ownAllocator, createIfNotExists, promise); - return promise; - } - }).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() { - @Override - public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException { - return processLogMetadatas( - uri, - logName, - logIdentifier, - metadatas, - ownAllocator); - } - }); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName, - KeeperException.Code.CONNECTIONLOSS)); - } catch (InterruptedException e) { - return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e)); - } - } - - @Override - public Future<LogMetadataForWriter> getLog(final URI uri, - final String logName, - final boolean ownAllocator, - final boolean createIfNotExists) { - return getLog( - uri, - logName, - conf.getUnpartitionedStreamName(), - zooKeeperClient, - ownAllocator, - createIfNotExists); - } - - // - // Delete Log - // - - @Override - public Future<Void> deleteLog(URI uri, final String logName) { - final Promise<Void> promise = new Promise<Void>(); - try { - String streamPath = LogMetadata.getLogStreamPath(uri, logName); - ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (KeeperException.Code.OK.intValue() != rc) { - FutureUtils.setException(promise, - new ZKException("Encountered zookeeper issue on deleting log stream " - + logName, KeeperException.Code.get(rc))); - return; - } - FutureUtils.setValue(promise, null); - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream " - + logName, KeeperException.Code.CONNECTIONLOSS)); - } catch (InterruptedException e) { - FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream " - + logName)); - } catch (KeeperException e) { - FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream " - + logName, e)); - } - return promise; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java deleted file mode 100644 index 6b7a231..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/metadata/ZkMetadataResolver.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.metadata; - -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.metadata.DLMetadata; -import com.twitter.distributedlog.metadata.MetadataResolver; -import org.apache.commons.lang.StringUtils; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.net.URI; - -public class ZkMetadataResolver implements MetadataResolver { - - private final ZooKeeperClient zkc; - - public ZkMetadataResolver(ZooKeeperClient zkc) { - this.zkc = zkc; - } - - @Override - public DLMetadata resolve(URI uri) throws IOException { - String dlPath = uri.getPath(); - PathUtils.validatePath(dlPath); - // Normal case the dl metadata is stored in the last segment - // so lookup last segment first. - String[] parts = StringUtils.split(dlPath, '/'); - if (null == parts || 0 == parts.length) { - throw new IOException("Invalid dlPath to resolve dl metadata : " + dlPath); - } - for (int i = parts.length; i >= 0; i--) { - String pathToResolve = String.format("/%s", StringUtils.join(parts, '/', 0, i)); - byte[] data; - try { - data = zkc.get().getData(pathToResolve, false, new Stat()); - } catch (KeeperException.NoNodeException nne) { - continue; - } catch (KeeperException ke) { - throw new IOException("Fail to resolve dl path : " + pathToResolve); - } catch (InterruptedException ie) { - throw new IOException("Interrupted when resolving dl path : " + pathToResolve); - } - if (null == data || data.length == 0) { - continue; - } - try { - return DLMetadata.deserialize(uri, data); - } catch (IOException ie) { - throw new IOException("Failed to deserialize uri : " + uri); - } - } - throw new IOException("No bkdl config bound under dl path : " + dlPath); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java deleted file mode 100644 index 7c5c2e4..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * The BookKeeper Based DistributedLog Implementation. - */ -package com.twitter.distributedlog.impl; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java deleted file mode 100644 index b067ee9..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionStateStore.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.subscription; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; - -import com.twitter.distributedlog.subscription.SubscriptionStateStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -import com.google.common.base.Charsets; - -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.util.Utils; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.util.Future; -import com.twitter.util.Promise; - -public class ZKSubscriptionStateStore implements SubscriptionStateStore { - - static final Logger logger = LoggerFactory.getLogger(ZKSubscriptionStateStore.class); - - private final ZooKeeperClient zooKeeperClient; - private final String zkPath; - private AtomicReference<DLSN> lastCommittedPosition = new AtomicReference<DLSN>(null); - - public ZKSubscriptionStateStore(ZooKeeperClient zooKeeperClient, String zkPath) { - this.zooKeeperClient = zooKeeperClient; - this.zkPath = zkPath; - } - - @Override - public void close() throws IOException { - } - - /** - * Get the last committed position stored for this subscription - */ - @Override - public Future<DLSN> getLastCommitPosition() { - if (null != lastCommittedPosition.get()) { - return Future.value(lastCommittedPosition.get()); - } else { - return getLastCommitPositionFromZK(); - } - } - - Future<DLSN> getLastCommitPositionFromZK() { - final Promise<DLSN> result = new Promise<DLSN>(); - try { - logger.debug("Reading last commit position from path {}", zkPath); - zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() { - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc); - if (KeeperException.Code.NONODE.intValue() == rc) { - result.setValue(DLSN.NonInclusiveLowerBound); - } else if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); - } else { - try { - DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8)); - result.setValue(dlsn); - } catch (Exception t) { - logger.warn("Invalid last commit position found from path {}", zkPath, t); - // invalid dlsn recorded in subscription state store - result.setValue(DLSN.NonInclusiveLowerBound); - } - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - result.setException(zkce); - } catch (InterruptedException ie) { - result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie)); - } - return result; - } - - /** - * Advances the position associated with the subscriber - * - * @param newPosition - new commit position - */ - @Override - public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) { - if (null == lastCommittedPosition.get() || - (newPosition.compareTo(lastCommittedPosition.get()) > 0)) { - lastCommittedPosition.set(newPosition); - return Utils.zkAsyncCreateFullPathOptimisticAndSetData(zooKeeperClient, - zkPath, newPosition.serialize().getBytes(Charsets.UTF_8), - zooKeeperClient.getDefaultACL(), - CreateMode.PERSISTENT); - } else { - return Future.Done(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java deleted file mode 100644 index 17ba943..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/subscription/ZKSubscriptionsStore.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.impl.subscription; - -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.subscription.SubscriptionStateStore; -import com.twitter.distributedlog.subscription.SubscriptionsStore; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; - -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * ZooKeeper Based Subscriptions Store. - */ -public class ZKSubscriptionsStore implements SubscriptionsStore { - - private final ZooKeeperClient zkc; - private final String zkPath; - private final ConcurrentMap<String, ZKSubscriptionStateStore> subscribers = - new ConcurrentHashMap<String, ZKSubscriptionStateStore>(); - - public ZKSubscriptionsStore(ZooKeeperClient zkc, String zkPath) { - this.zkc = zkc; - this.zkPath = zkPath; - } - - private ZKSubscriptionStateStore getSubscriber(String subscriberId) { - ZKSubscriptionStateStore ss = subscribers.get(subscriberId); - if (ss == null) { - ZKSubscriptionStateStore newSS = new ZKSubscriptionStateStore(zkc, - getSubscriberZKPath(subscriberId)); - ZKSubscriptionStateStore oldSS = subscribers.putIfAbsent(subscriberId, newSS); - if (oldSS == null) { - ss = newSS; - } else { - try { - newSS.close(); - } catch (IOException e) { - // ignore the exception - } - ss = oldSS; - } - } - return ss; - } - - private String getSubscriberZKPath(String subscriberId) { - return String.format("%s/%s", zkPath, subscriberId); - } - - @Override - public Future<DLSN> getLastCommitPosition(String subscriberId) { - return getSubscriber(subscriberId).getLastCommitPosition(); - } - - @Override - public Future<Map<String, DLSN>> getLastCommitPositions() { - final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>(); - try { - this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() { - @Override - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - if (KeeperException.Code.NONODE.intValue() == rc) { - result.setValue(new HashMap<String, DLSN>()); - } else if (KeeperException.Code.OK.intValue() != rc) { - result.setException(KeeperException.create(KeeperException.Code.get(rc), path)); - } else { - getLastCommitPositions(result, children); - } - } - }, null); - } catch (ZooKeeperClient.ZooKeeperConnectionException zkce) { - result.setException(zkce); - } catch (InterruptedException ie) { - result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie)); - } - return result; - } - - private void getLastCommitPositions(final Promise<Map<String, DLSN>> result, - List<String> subscribers) { - List<Future<Pair<String, DLSN>>> futures = - new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size()); - for (String s : subscribers) { - final String subscriber = s; - Future<Pair<String, DLSN>> future = - // Get the last commit position from zookeeper - getSubscriber(subscriber).getLastCommitPositionFromZK().map( - new AbstractFunction1<DLSN, Pair<String, DLSN>>() { - @Override - public Pair<String, DLSN> apply(DLSN dlsn) { - return Pair.of(subscriber, dlsn); - } - }); - futures.add(future); - } - Future.collect(futures).foreach( - new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() { - @Override - public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) { - Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>(); - for (Pair<String, DLSN> pair : subscriptions) { - subscriptionMap.put(pair.getLeft(), pair.getRight()); - } - result.setValue(subscriptionMap); - return BoxedUnit.UNIT; - } - }); - } - - @Override - public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) { - return getSubscriber(subscriberId).advanceCommitPosition(newPosition); - } - - @Override - public Future<Boolean> deleteSubscriber(String subscriberId) { - subscribers.remove(subscriberId); - String path = getSubscriberZKPath(subscriberId); - return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1)); - } - - @Override - public void close() throws IOException { - // no-op - for (SubscriptionStateStore store : subscribers.values()) { - store.close(); - } - } - -}