http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java deleted file mode 100644 index 5b788e2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.zk; - -import com.twitter.distributedlog.util.Transaction.OpListener; -import org.apache.bookkeeper.meta.ZkVersion; -import org.apache.bookkeeper.versioning.Version; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Op; -import org.apache.zookeeper.OpResult; - -import javax.annotation.Nullable; - -/** - * ZooKeeper Operation that plays with {@link org.apache.bookkeeper.versioning.Version} - */ -public class ZKVersionedSetOp extends ZKOp { - - private final OpListener<Version> listener; - - public ZKVersionedSetOp(Op op, - @Nullable OpListener<Version> opListener) { - super(op); - this.listener = opListener; - } - - @Override - protected void commitOpResult(OpResult opResult) { - assert(opResult instanceof OpResult.SetDataResult); - OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult; - if (null != listener) { - listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion())); - } - } - - @Override - protected void abortOpResult(Throwable t, - @Nullable OpResult opResult) { - Throwable cause; - if (null == opResult) { - cause = t; - } else { - assert (opResult instanceof OpResult.ErrorResult); - OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult; - if (KeeperException.Code.OK.intValue() == errorResult.getErr()) { - cause = t; - } else { - cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr())); - } - } - if (null != listener) { - listener.onAbort(cause); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java deleted file mode 100644 index 8ef33ea..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java +++ /dev/null @@ -1,239 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.zk; - -import com.twitter.distributedlog.ZooKeeperClient; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Watcher Manager to manage watchers. - * <h3>Metrics</h3> - * <ul> - * <li> `total_watches`: total number of watches that managed by this watcher manager. - * <li> `num_child_watches`: number of paths that are watched for children changes by this watcher manager. - * </ul> - */ -public class ZKWatcherManager implements Watcher { - - static final Logger logger = LoggerFactory.getLogger(ZKWatcherManager.class); - - public static Builder newBuilder() { - return new Builder(); - } - - public static class Builder { - - private String _name; - private StatsLogger _statsLogger; - private ZooKeeperClient _zkc; - - public Builder name(String name) { - this._name = name; - return this; - } - - public Builder zkc(ZooKeeperClient zkc) { - this._zkc = zkc; - return this; - } - - public Builder statsLogger(StatsLogger statsLogger) { - this._statsLogger = statsLogger; - return this; - } - - public ZKWatcherManager build() { - return new ZKWatcherManager(_name, _zkc, _statsLogger); - } - } - - private final String name; - private final ZooKeeperClient zkc; - private final StatsLogger statsLogger; - // Gauges and their labels - private final Gauge<Number> totalWatchesGauge; - private static final String totalWatchesGauageLabel = "total_watches"; - private final Gauge<Number> numChildWatchesGauge; - private static final String numChildWatchesGauageLabel = "num_child_watches"; - - protected final ConcurrentMap<String, Set<Watcher>> childWatches; - protected final AtomicInteger allWatchesGauge; - - private ZKWatcherManager(String name, - ZooKeeperClient zkc, - StatsLogger statsLogger) { - this.name = name; - this.zkc = zkc; - this.statsLogger = statsLogger; - - // watches - this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>(); - this.allWatchesGauge = new AtomicInteger(0); - - // stats - totalWatchesGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return allWatchesGauge.get(); - } - }; - this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge); - - numChildWatchesGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return childWatches.size(); - } - }; - - this.statsLogger.registerGauge(numChildWatchesGauageLabel, numChildWatchesGauge); - } - - public Watcher registerChildWatcher(String path, Watcher watcher) { - Set<Watcher> watchers = childWatches.get(path); - if (null == watchers) { - Set<Watcher> newWatchers = new HashSet<Watcher>(); - Set<Watcher> oldWatchers = childWatches.putIfAbsent(path, newWatchers); - watchers = (null == oldWatchers) ? newWatchers : oldWatchers; - } - synchronized (watchers) { - if (childWatches.get(path) == watchers) { - if (watchers.add(watcher)) { - allWatchesGauge.incrementAndGet(); - } - } else { - logger.warn("Watcher set for path {} has been changed while registering child watcher {}.", - path, watcher); - } - } - return this; - } - - public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer) { - Set<Watcher> watchers = childWatches.get(path); - if (null == watchers) { - logger.warn("No watchers found on path {} while unregistering child watcher {}.", - path, watcher); - return; - } - synchronized (watchers) { - if (watchers.remove(watcher)) { - allWatchesGauge.decrementAndGet(); - } else { - logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path); - } - if (watchers.isEmpty()) { - // best-efforts to remove watches - try { - if (null != zkc && removeFromServer) { - zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() { - @Override - public void processResult(int rc, String path, Object ctx) { - if (KeeperException.Code.OK.intValue() == rc) { - logger.debug("Successfully removed children watches from {}", path); - } else { - logger.debug("Encountered exception on removing children watches from {}", - path, KeeperException.create(KeeperException.Code.get(rc))); - } - } - }, null); - } - } catch (InterruptedException e) { - logger.debug("Encountered exception on removing watches from {}", path, e); - } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - logger.debug("Encountered exception on removing watches from {}", path, e); - } - childWatches.remove(path, watchers); - } - } - } - - public void unregisterGauges() { - this.statsLogger.unregisterGauge(totalWatchesGauageLabel, totalWatchesGauge); - this.statsLogger.unregisterGauge(numChildWatchesGauageLabel, numChildWatchesGauge); - } - - @Override - public void process(WatchedEvent event) { - switch (event.getType()) { - case None: - handleKeeperStateEvent(event); - break; - case NodeChildrenChanged: - handleChildWatchEvent(event); - break; - default: - break; - } - } - - private void handleKeeperStateEvent(WatchedEvent event) { - Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get()); - for (Set<Watcher> watcherSet : childWatches.values()) { - synchronized (watcherSet) { - savedAllWatches.addAll(watcherSet); - } - } - for (Watcher watcher : savedAllWatches) { - watcher.process(event); - } - } - - private void handleChildWatchEvent(WatchedEvent event) { - String path = event.getPath(); - if (null == path) { - logger.warn("Received zookeeper watch event with null path : {}", event); - return; - } - Set<Watcher> watchers = childWatches.get(path); - if (null == watchers) { - return; - } - Set<Watcher> watchersToFire; - synchronized (watchers) { - watchersToFire = new HashSet<Watcher>(watchers.size()); - watchersToFire.addAll(watchers); - } - for (Watcher watcher : watchersToFire) { - watcher.process(event); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java new file mode 100644 index 0000000..1d96f0e --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.io.InputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AppendOnlyStreamReader extends InputStream { + static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamReader.class); + + private LogRecordWithInputStream currentLogRecord = null; + private final DistributedLogManager dlm; + private LogReader reader; + private long currentPosition; + private static final int SKIP_BUFFER_SIZE = 512; + + // Cache the input stream for a log record. + private static class LogRecordWithInputStream { + private final InputStream payloadStream; + private final LogRecordWithDLSN logRecord; + + LogRecordWithInputStream(LogRecordWithDLSN logRecord) { + Preconditions.checkNotNull(logRecord); + + LOG.debug("Got record dlsn = {}, txid = {}, len = {}", + new Object[] {logRecord.getDlsn(), logRecord.getTransactionId(), logRecord.getPayload().length}); + + this.logRecord = logRecord; + this.payloadStream = logRecord.getPayLoadInputStream(); + } + + InputStream getPayLoadInputStream() { + return payloadStream; + } + + LogRecordWithDLSN getLogRecord() { + return logRecord; + } + + // The last txid of the log record is the position of the next byte in the stream. + // Subtract length to get starting offset. + long getOffset() { + return logRecord.getTransactionId() - logRecord.getPayload().length; + } + } + + /** + * Construct ledger input stream + * + * @param dlm the Distributed Log Manager to access the stream + */ + AppendOnlyStreamReader(DistributedLogManager dlm) + throws IOException { + this.dlm = dlm; + reader = dlm.getInputStream(0); + currentPosition = 0; + } + + /** + * Get input stream representing next entry in the + * ledger. + * + * @return input stream, or null if no more entries + */ + private LogRecordWithInputStream nextLogRecord() throws IOException { + return nextLogRecord(reader); + } + + private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException { + LogRecordWithDLSN record = reader.readNext(false); + + if (null != record) { + return new LogRecordWithInputStream(record); + } else { + record = reader.readNext(false); + if (null != record) { + return new LogRecordWithInputStream(record); + } else { + LOG.debug("No record"); + return null; + } + } + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + if (read(b, 0, 1) != 1) { + return -1; + } else { + return b[0]; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int read = 0; + if (currentLogRecord == null) { + currentLogRecord = nextLogRecord(); + if (currentLogRecord == null) { + return read; + } + } + + while (read < len) { + int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, (len - read)); + if (thisread == -1) { + currentLogRecord = nextLogRecord(); + if (currentLogRecord == null) { + return read; + } + } else { + LOG.debug("Offset saved = {}, persisted = {}", + currentPosition, currentLogRecord.getLogRecord().getTransactionId()); + currentPosition += thisread; + read += thisread; + } + } + return read; + } + + /** + * Position the reader at the given offset. If we fail to skip to the desired position + * and don't hit end of stream, return false. + * + * @throws org.apache.distributedlog.exceptions.EndOfStreamException if we attempt to + * skip past the end of the stream. + */ + public boolean skipTo(long position) throws IOException { + + // No need to skip anywhere. + if (position == position()) { + return true; + } + + LogReader skipReader = dlm.getInputStream(position); + LogRecordWithInputStream logRecord = null; + try { + logRecord = nextLogRecord(skipReader); + } catch (IOException ex) { + skipReader.close(); + throw ex; + } + + if (null == logRecord) { + return false; + } + + // We may end up with a reader positioned *before* the requested position if + // we're near the tail and the writer is still active, or if the desired position + // is not at a log record payload boundary. + // Transaction ID gives us the starting position of the log record. Read ahead + // if necessary. + currentPosition = logRecord.getOffset(); + currentLogRecord = logRecord; + LogReader oldReader = reader; + reader = skipReader; + + // Close the oldreader after swapping AppendOnlyStreamReader state. Close may fail + // and we need to make sure it leaves AppendOnlyStreamReader in a consistent state. + oldReader.close(); + + byte[] skipBuffer = new byte[SKIP_BUFFER_SIZE]; + while (currentPosition < position) { + long bytesToRead = Math.min(position - currentPosition, SKIP_BUFFER_SIZE); + long bytesRead = read(skipBuffer, 0, (int)bytesToRead); + if (bytesRead < bytesToRead) { + return false; + } + } + + return true; + } + + public long position() { + return currentPosition; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java new file mode 100644 index 0000000..8278c68 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.util.FutureUtils; +import com.twitter.util.Await; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import java.io.Closeable; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AppendOnlyStreamWriter implements Closeable { + static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class); + + // Use a 1-length array to satisfy Java's inner class reference rules. Use primitive + // type because synchronized block is needed anyway. + final long[] syncPos = new long[1]; + BKAsyncLogWriter logWriter; + long requestPos = 0; + + public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) { + LOG.debug("initialize at position {}", pos); + this.logWriter = logWriter; + this.syncPos[0] = pos; + this.requestPos = pos; + } + + public Future<DLSN> write(byte[] data) { + requestPos += data.length; + Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data)); + return writeResult.addEventListener(new WriteCompleteListener(requestPos)); + } + + public void force(boolean metadata) throws IOException { + long pos = 0; + try { + pos = Await.result(logWriter.flushAndCommit()); + } catch (IOException ioe) { + throw ioe; + } catch (Exception ex) { + LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex); + throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex); + } + synchronized (syncPos) { + syncPos[0] = pos; + } + } + + public long position() { + synchronized (syncPos) { + return syncPos[0]; + } + } + + @Override + public void close() throws IOException { + logWriter.closeAndComplete(); + } + + public void markEndOfStream() throws IOException { + try { + Await.result(logWriter.markEndOfStream()); + } catch (IOException ioe) { + throw ioe; + } catch (Exception ex) { + throw new UnexpectedException("Mark end of stream hit unexpected exception", ex); + } + } + + class WriteCompleteListener implements FutureEventListener<DLSN> { + private final long position; + public WriteCompleteListener(long position) { + this.position = position; + } + @Override + public void onSuccess(DLSN response) { + synchronized (syncPos) { + if (position > syncPos[0]) { + syncPos[0] = position; + } + } + } + @Override + public void onFailure(Throwable cause) { + // Handled at the layer above + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java new file mode 100644 index 0000000..e3ace05 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.io.AsyncCloseable; +import com.twitter.util.Future; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public interface AsyncLogReader extends AsyncCloseable { + + /** + * Get stream name that the reader reads from. + * + * @return stream name. + */ + public String getStreamName(); + + /** + * Read the next record from the log stream + * + * @return A promise that when satisfied will contain the Log Record with its DLSN. + */ + public Future<LogRecordWithDLSN> readNext(); + + /** + * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list + * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort + * call. + * + * @param numEntries + * num entries + * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. + */ + public Future<List<LogRecordWithDLSN>> readBulk(int numEntries); + + /** + * Read next <i>numEntries</i> entries in a given <i>waitTime</i>. + * <p> + * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>. + * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would + * wait until new entries are available. + * + * @param numEntries + * max entries to return + * @param waitTime + * maximum wait time if there are entries already for read + * @param timeUnit + * wait time unit + * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. + */ + public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java new file mode 100644 index 0000000..53b393b --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.io.AsyncAbortable; +import org.apache.distributedlog.io.AsyncCloseable; +import com.twitter.util.Future; + +import java.io.Closeable; +import java.util.List; + +public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { + + /** + * Get the last committed transaction id. + * + * @return last committed transaction id. + */ + public long getLastTxId(); + + /** + * Write a log record to the stream. + * + * @param record single log record + * @return A Future which contains a DLSN if the record was successfully written + * or an exception if the write fails + */ + public Future<DLSN> write(LogRecord record); + + /** + * Write log records to the stream in bulk. Each future in the list represents the result of + * one write operation. The size of the result list is equal to the size of the input list. + * Buffers are written in order, and the list of result futures has the same order. + * + * @param record set of log records + * @return A Future which contains a list of Future DLSNs if the record was successfully written + * or an exception if the operation fails. + */ + public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record); + + /** + * Truncate the log until <i>dlsn</i>. + * + * @param dlsn + * dlsn to truncate until. + * @return A Future indicates whether the operation succeeds or not, or an exception + * if the truncation fails. + */ + public Future<Boolean> truncate(DLSN dlsn); + + /** + * Get the name of the stream this writer writes data to + */ + public String getStreamName(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java new file mode 100644 index 0000000..c12bd10 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +public interface AsyncNotification { + /** + * Triggered when the background activity encounters an exception + * + * @param reason the exception that encountered. + */ + void notifyOnError(Throwable reason); + + /** + * Triggered when the background activity completes an operation + */ + void notifyOnOperationComplete(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java new file mode 100644 index 0000000..4a2ef30 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java @@ -0,0 +1,555 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.LockingException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.exceptions.ZKException; +import org.apache.distributedlog.io.Abortable; +import org.apache.distributedlog.io.Abortables; +import org.apache.distributedlog.io.AsyncAbortable; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.PermitManager; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction0; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable { + static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class); + + protected final DistributedLogConfiguration conf; + private final DynamicDistributedLogConfiguration dynConf; + protected final BKDistributedLogManager bkDistributedLogManager; + + // States + private Promise<Void> closePromise = null; + private volatile boolean forceRolling = false; + private boolean forceRecovery = false; + + // Truncation Related + private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null; + @VisibleForTesting + private Long minTimestampToKeepOverride = null; + + // Log Segment Writers + protected BKLogSegmentWriter segmentWriter = null; + protected Future<BKLogSegmentWriter> segmentWriterFuture = null; + protected BKLogSegmentWriter allocatedSegmentWriter = null; + protected BKLogWriteHandler writeHandler = null; + + BKAbstractLogWriter(DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + BKDistributedLogManager bkdlm) { + this.conf = conf; + this.dynConf = dynConf; + this.bkDistributedLogManager = bkdlm; + LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(), + TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS)); + } + + // manage write handler + + synchronized protected BKLogWriteHandler getCachedWriteHandler() { + return writeHandler; + } + + protected BKLogWriteHandler getWriteHandler() throws IOException { + BKLogWriteHandler writeHandler = createAndCacheWriteHandler(); + writeHandler.checkMetadataException(); + return writeHandler; + } + + protected BKLogWriteHandler createAndCacheWriteHandler() + throws IOException { + synchronized (this) { + if (writeHandler != null) { + return writeHandler; + } + } + // This code path will be executed when the handler is not set or has been closed + // due to forceRecovery during testing + BKLogWriteHandler newHandler = + FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false)); + boolean success = false; + try { + synchronized (this) { + if (writeHandler == null) { + writeHandler = newHandler; + success = true; + } + return writeHandler; + } + } finally { + if (!success) { + newHandler.asyncAbort(); + } + } + } + + // manage log segment writers + + protected synchronized BKLogSegmentWriter getCachedLogWriter() { + return segmentWriter; + } + + protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() { + return segmentWriterFuture; + } + + protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) { + this.segmentWriter = logWriter; + this.segmentWriterFuture = Future.value(logWriter); + } + + protected synchronized BKLogSegmentWriter removeCachedLogWriter() { + try { + return segmentWriter; + } finally { + segmentWriter = null; + segmentWriterFuture = null; + } + } + + protected synchronized BKLogSegmentWriter getAllocatedLogWriter() { + return allocatedSegmentWriter; + } + + protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) { + this.allocatedSegmentWriter = logWriter; + } + + protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() { + try { + return allocatedSegmentWriter; + } finally { + allocatedSegmentWriter = null; + } + } + + private Future<Void> asyncCloseAndComplete(boolean shouldThrow) { + BKLogSegmentWriter segmentWriter = getCachedLogWriter(); + BKLogWriteHandler writeHandler = getCachedWriteHandler(); + if (null != segmentWriter && null != writeHandler) { + cancelTruncation(); + Promise<Void> completePromise = new Promise<Void>(); + asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow); + return completePromise; + } else { + return closeNoThrow(); + } + } + + private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter, + final BKLogWriteHandler writeHandler, + final Promise<Void> completePromise, + final boolean shouldThrow) { + writeHandler.completeAndCloseLogSegment(segmentWriter) + .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + @Override + public void onSuccess(LogSegmentMetadata segment) { + removeCachedLogWriter(); + complete(null); + } + + @Override + public void onFailure(Throwable cause) { + LOG.error("Completing Log segments encountered exception", cause); + complete(cause); + } + + private void complete(final Throwable cause) { + closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + if (null != cause && shouldThrow) { + FutureUtils.setException(completePromise, cause); + } else { + FutureUtils.setValue(completePromise, null); + } + return BoxedUnit.UNIT; + } + }); + } + }); + } + + @VisibleForTesting + void closeAndComplete() throws IOException { + FutureUtils.result(asyncCloseAndComplete(true)); + } + + protected Future<Void> asyncCloseAndComplete() { + return asyncCloseAndComplete(true); + } + + @Override + public void close() throws IOException { + FutureUtils.result(asyncClose()); + } + + @Override + public Future<Void> asyncClose() { + return asyncCloseAndComplete(false); + } + + /** + * Close the writer and release all the underlying resources + */ + protected Future<Void> closeNoThrow() { + Promise<Void> closeFuture; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closeFuture = closePromise = new Promise<Void>(); + } + cancelTruncation(); + Utils.closeSequence(bkDistributedLogManager.getScheduler(), + true, /** ignore close errors **/ + getCachedLogWriter(), + getAllocatedLogWriter(), + getCachedWriteHandler() + ).proxyTo(closeFuture); + return closeFuture; + } + + @Override + public void abort() throws IOException { + FutureUtils.result(asyncAbort()); + } + + @Override + public Future<Void> asyncAbort() { + Promise<Void> closeFuture; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closeFuture = closePromise = new Promise<Void>(); + } + cancelTruncation(); + Abortables.abortSequence(bkDistributedLogManager.getScheduler(), + getCachedLogWriter(), + getAllocatedLogWriter(), + getCachedWriteHandler()).proxyTo(closeFuture); + return closeFuture; + } + + // used by sync writer + protected BKLogSegmentWriter getLedgerWriter(final long startTxId, + final boolean allowMaxTxID) + throws IOException { + Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true); + BKLogSegmentWriter logSegmentWriter = null; + if (null != logSegmentWriterFuture) { + logSegmentWriter = FutureUtils.result(logSegmentWriterFuture); + } + if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) { + logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary( + logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID)); + } + return logSegmentWriter; + } + + // used by async writer + synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) { + final BKLogSegmentWriter ledgerWriter = getCachedLogWriter(); + Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture(); + if (null == ledgerWriterFuture || null == ledgerWriter) { + return null; + } + + // Handle the case where the last call to write actually caused an error in the log + if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) { + // Close the ledger writer so that we will recover and start a new log segment + Future<Void> closeFuture; + if (ledgerWriter.isLogSegmentInError()) { + closeFuture = ledgerWriter.asyncAbort(); + } else { + closeFuture = ledgerWriter.asyncClose(); + } + return closeFuture.flatMap( + new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() { + @Override + public Future<BKLogSegmentWriter> apply(Void result) { + removeCachedLogWriter(); + + if (ledgerWriter.isLogSegmentInError()) { + return Future.value(null); + } + + BKLogWriteHandler writeHandler; + try { + writeHandler = getWriteHandler(); + } catch (IOException e) { + return Future.exception(e); + } + if (null != writeHandler && forceRecovery) { + return writeHandler.completeAndCloseLogSegment(ledgerWriter) + .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() { + @Override + public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) { + return null; + } + }); + } else { + return Future.value(null); + } + } + }); + } else { + return ledgerWriterFuture; + } + } + + boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException { + BKLogWriteHandler writeHandler = getWriteHandler(); + return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling; + } + + private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) { + boolean truncationEnabled = false; + + long minTimestampToKeep = 0; + + long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS); + if (retentionPeriodInMillis > 0) { + minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis; + truncationEnabled = true; + } + + if (null != minTimestampToKeepOverride) { + minTimestampToKeep = minTimestampToKeepOverride; + truncationEnabled = true; + } + + // skip scheduling if there is task that's already running + // + synchronized (this) { + if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) { + lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); + } + } + } + + private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, + final long startTxId, + final boolean allowMaxTxID) { + return writeHandler.recoverIncompleteLogSegments() + .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() { + @Override + public Future<BKLogSegmentWriter> apply(Long lastTxId) { + return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID) + .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() { + @Override + public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) { + cacheLogWriter(newSegmentWriter); + return BoxedUnit.UNIT; + } + }); + } + }); + } + + private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit( + final BKLogSegmentWriter oldSegmentWriter, + final BKLogWriteHandler writeHandler, + final long startTxId, + final boolean bestEffort, + final boolean allowMaxTxID) { + final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit(); + if (switchPermit.isAllowed()) { + return closeOldLogSegmentAndStartNewOne( + oldSegmentWriter, + writeHandler, + startTxId, + bestEffort, + allowMaxTxID + ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() { + @Override + public Future<BKLogSegmentWriter> apply(Throwable cause) { + if (cause instanceof LockingException) { + LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ", + writeHandler.getFullyQualifiedName(), cause); + bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); + return Future.value(oldSegmentWriter); + } else if (cause instanceof ZKException) { + ZKException zke = (ZKException) cause; + if (ZKException.isRetryableZKException(zke)) { + LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." + + " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(), + zke.getKeeperExceptionCode()); + bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit); + return Future.value(oldSegmentWriter); + } + } + return Future.exception(cause); + } + }).ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + bkDistributedLogManager.getLogSegmentRollingPermitManager() + .releasePermit(switchPermit); + return BoxedUnit.UNIT; + } + }); + } else { + bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit); + return Future.value(oldSegmentWriter); + } + } + + private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne( + final BKLogSegmentWriter oldSegmentWriter, + final BKLogWriteHandler writeHandler, + final long startTxId, + final boolean bestEffort, + final boolean allowMaxTxID) { + // we switch only when we could allocate a new log segment. + BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter(); + if (null == newSegmentWriter) { + if (LOG.isDebugEnabled()) { + LOG.debug("Allocating a new log segment from {} for {}.", startTxId, + writeHandler.getFullyQualifiedName()); + } + return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID) + .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { + @Override + public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) { + if (null == newSegmentWriter) { + if (bestEffort) { + return Future.value(oldSegmentWriter); + } else { + return Future.exception( + new UnexpectedException("StartLogSegment returns null for bestEffort rolling")); + } + } + cacheAllocatedLogWriter(newSegmentWriter); + if (LOG.isDebugEnabled()) { + LOG.debug("Allocated a new log segment from {} for {}.", startTxId, + writeHandler.getFullyQualifiedName()); + } + return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter); + } + }); + } else { + return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter); + } + } + + private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter( + BKLogSegmentWriter oldSegmentWriter, + final BKLogSegmentWriter newSegmentWriter) { + final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>(); + // complete the old log segment + writeHandler.completeAndCloseLogSegment(oldSegmentWriter) + .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + + @Override + public void onSuccess(LogSegmentMetadata value) { + cacheLogWriter(newSegmentWriter); + removeAllocatedLogWriter(); + FutureUtils.setValue(completePromise, newSegmentWriter); + } + + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(completePromise, cause); + } + }); + return completePromise; + } + + synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary( + final BKLogSegmentWriter segmentWriter, + long startTxId, + boolean bestEffort, + boolean allowMaxTxID) { + final BKLogWriteHandler writeHandler; + try { + writeHandler = getWriteHandler(); + } catch (IOException e) { + return Future.exception(e); + } + Future<BKLogSegmentWriter> rollPromise; + if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) { + rollPromise = closeOldLogSegmentAndStartNewOneWithPermit( + segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID); + } else if (null == segmentWriter) { + rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID); + } else { + rollPromise = Future.value(segmentWriter); + } + return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() { + @Override + public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) { + if (segmentWriter == newSegmentWriter) { + return newSegmentWriter; + } + truncateLogSegmentsIfNecessary(writeHandler); + return newSegmentWriter; + } + }); + } + + protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { + if (null != closePromise) { + LOG.error("Executing " + operation + " on already closed Log Writer"); + throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer"); + } + } + + @VisibleForTesting + public void setForceRolling(boolean forceRolling) { + this.forceRolling = forceRolling; + } + + @VisibleForTesting + public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) { + this.minTimestampToKeepOverride = minTimestampToKeepOverride; + } + + protected synchronized void cancelTruncation() { + if (null != lastTruncationAttempt) { + FutureUtils.cancel(lastTruncationAttempt); + lastTruncationAttempt = null; + } + } + + @VisibleForTesting + public synchronized void setForceRecovery(boolean forceRecovery) { + this.forceRecovery = forceRecovery; + } + +}