Repository: incubator-distributedlog Updated Branches: refs/heads/master 0f4ea2816 -> 53fca4ac3
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java index 8ef5c46..006f832 100644 --- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java +++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/AtomicWriter.java @@ -17,22 +17,21 @@ */ package org.apache.distributedlog.basic; +import static com.google.common.base.Charsets.UTF_8; + import com.google.common.collect.Lists; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Await; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecordSet; import org.apache.distributedlog.io.CompressionCodec.Type; import org.apache.distributedlog.service.DistributedLogClient; import org.apache.distributedlog.service.DistributedLogClientBuilder; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; - -import java.nio.ByteBuffer; -import java.util.List; - -import static com.google.common.base.Charsets.UTF_8; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; /** * Write multiple record atomically @@ -60,12 +59,12 @@ public class AtomicWriter { .build(); final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(16 * 1024, Type.NONE); - List<Future<DLSN>> writeFutures = Lists.newArrayListWithExpectedSize(messages.length); + List<CompletableFuture<DLSN>> writeFutures = Lists.newArrayListWithExpectedSize(messages.length); for (String msg : messages) { final String message = msg; ByteBuffer msgBuf = ByteBuffer.wrap(msg.getBytes(UTF_8)); - Promise<DLSN> writeFuture = new Promise<DLSN>(); - writeFuture.addEventListener(new FutureEventListener<DLSN>() { + CompletableFuture<DLSN> writeFuture = FutureUtils.createFuture(); + writeFuture.whenComplete(new FutureEventListener<DLSN>() { @Override public void onFailure(Throwable cause) { System.out.println("Encountered error on writing data"); @@ -81,9 +80,9 @@ public class AtomicWriter { recordSetWriter.writeRecord(msgBuf, writeFuture); writeFutures.add(writeFuture); } - FutureUtils.result( + Await.result( client.writeRecordSet(streamName, recordSetWriter) - .addEventListener(new FutureEventListener<DLSN>() { + .addEventListener(new com.twitter.util.FutureEventListener<DLSN>() { @Override public void onFailure(Throwable cause) { recordSetWriter.abortTransmit(cause); @@ -101,7 +100,7 @@ public class AtomicWriter { } }) ); - FutureUtils.result(Future.collect(writeFutures)); + FutureUtils.result(FutureUtils.collect(writeFutures)); client.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java index 4322224..833c0ce 100644 --- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java +++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/ConsoleWriter.java @@ -17,18 +17,21 @@ */ package org.apache.distributedlog.basic; -import org.apache.distributedlog.*; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; -import jline.ConsoleReader; +import static com.google.common.base.Charsets.UTF_8; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import java.net.URI; import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Charsets.UTF_8; +import jline.ConsoleReader; /** * Writer write records from console @@ -53,7 +56,7 @@ public class ConsoleWriter { conf.setOutputBufferSize(0); conf.setPeriodicFlushFrequencyMilliSeconds(0); conf.setLockTimeout(DistributedLogConstants.LOCK_IMMEDIATE); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .regionId(DistributedLogConstants.LOCAL_REGION_ID) @@ -73,7 +76,7 @@ public class ConsoleWriter { String line; while ((line = reader.readLine(PROMPT_MESSAGE)) != null) { writer.write(new LogRecord(System.currentTimeMillis(), line.getBytes(UTF_8))) - .addEventListener(new FutureEventListener<DLSN>() { + .whenComplete(new FutureEventListener<DLSN>() { @Override public void onFailure(Throwable cause) { System.out.println("Encountered error on writing data"); @@ -89,7 +92,7 @@ public class ConsoleWriter { } } finally { if (null != writer) { - FutureUtils.result(writer.asyncClose(), Duration.apply(5, TimeUnit.SECONDS)); + FutureUtils.result(writer.asyncClose(), 5, TimeUnit.SECONDS); } } } finally { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java index 9fe2013..29370de 100644 --- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java +++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/MultiReader.java @@ -18,15 +18,17 @@ package org.apache.distributedlog.basic; import org.apache.distributedlog.*; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.exceptions.LogEmptyException; import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.util.FutureEventListener; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.commons.lang.StringUtils; import java.net.URI; import java.util.concurrent.CountDownLatch; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import static com.google.common.base.Charsets.UTF_8; @@ -48,7 +50,7 @@ public class MultiReader { URI uri = URI.create(dlUriStr); DistributedLogConfiguration conf = new DistributedLogConfiguration(); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -67,7 +69,7 @@ public class MultiReader { for (DistributedLogManager dlm : managers) { final DistributedLogManager manager = dlm; - dlm.getLastLogRecordAsync().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + dlm.getLastLogRecordAsync().whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onFailure(Throwable cause) { if (cause instanceof LogNotFoundException) { @@ -99,7 +101,7 @@ public class MultiReader { final DLSN dlsn, final CountDownLatch keepAliveLatch) { System.out.println("Wait for records from " + dlm.getStreamName() + " starting from " + dlsn); - dlm.openAsyncLogReader(dlsn).addEventListener(new FutureEventListener<AsyncLogReader>() { + dlm.openAsyncLogReader(dlsn).whenComplete(new FutureEventListener<AsyncLogReader>() { @Override public void onFailure(Throwable cause) { System.err.println("Encountered error on reading records from stream " + dlm.getStreamName()); @@ -131,10 +133,10 @@ public class MultiReader { System.out.println("\"\"\""); System.out.println(new String(record.getPayload(), UTF_8)); System.out.println("\"\"\""); - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); } }; - reader.readNext().addEventListener(readListener); + reader.readNext().whenComplete(readListener); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java index 50a456d..b43b90c 100644 --- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java +++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/StreamRewinder.java @@ -17,19 +17,20 @@ */ package org.apache.distributedlog.basic; -import org.apache.distributedlog.*; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.CountDownLatch; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; +import static com.google.common.base.Charsets.UTF_8; import java.net.URI; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - -import static com.google.common.base.Charsets.UTF_8; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; /** * Rewind a stream to read data back in a while @@ -50,7 +51,7 @@ public class StreamRewinder { URI uri = URI.create(dlUriStr); DistributedLogConfiguration conf = new DistributedLogConfiguration(); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -101,13 +102,13 @@ public class StreamRewinder { caughtup.set(true); } - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); } }; - reader.readNext().addEventListener(readListener); + reader.readNext().whenComplete(readListener); keepAliveLatch.await(); - FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS)); + FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java index 8b43b45..6a3acf6 100644 --- a/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java +++ b/distributedlog-tutorials/distributedlog-basic/src/main/java/org/apache/distributedlog/basic/TailReader.java @@ -17,20 +17,22 @@ */ package org.apache.distributedlog.basic; -import org.apache.distributedlog.*; -import org.apache.distributedlog.exceptions.LogEmptyException; -import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; +import static com.google.common.base.Charsets.UTF_8; import java.net.URI; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Charsets.UTF_8; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.exceptions.LogEmptyException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; /** * A reader is tailing a log @@ -50,7 +52,7 @@ public class TailReader { URI uri = URI.create(dlUriStr); DistributedLogConfiguration conf = new DistributedLogConfiguration(); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -100,13 +102,13 @@ public class TailReader { System.out.println("\"\"\""); System.out.println(new String(record.getPayload(), UTF_8)); System.out.println("\"\"\""); - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); } }; - reader.readNext().addEventListener(readListener); + reader.readNext().whenComplete(readListener); keepAliveLatch.await(); - FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS)); + FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties b/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties index 56a6417..7aa93f6 100644 --- a/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties +++ b/distributedlog-tutorials/distributedlog-kafka/conf/log4j.properties @@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false -log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false log4j.appender.Executors=org.apache.log4j.RollingFileAppender http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java b/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java index f1490d4..9cf1cf9 100644 --- a/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java +++ b/distributedlog-tutorials/distributedlog-kafka/src/main/java/org/apache/distributedlog/kafka/DLFutureRecordMetadata.java @@ -17,20 +17,18 @@ */ package org.apache.distributedlog.kafka; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.util.FutureUtils; +import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.FutureEventListener; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; - import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.distributedlog.DLSN; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; class DLFutureRecordMetadata implements Future<RecordMetadata> { @@ -79,25 +77,26 @@ class DLFutureRecordMetadata implements Future<RecordMetadata> { @Override public RecordMetadata get() throws InterruptedException, ExecutionException { try { - FutureUtils.result(dlsnFuture); + Await.result(dlsnFuture); // TODO: align the DLSN concepts with kafka concepts return new RecordMetadata(new TopicPartition(topic, 0), -1L, -1L); - } catch (DLInterruptedException e) { + } catch (InterruptedException e) { throw new InterruptedException("Interrupted on waiting for response"); - } catch (IOException e) { + } catch (Exception e) { throw new ExecutionException("Error on waiting for response", e); } } @Override - public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + public RecordMetadata get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { try { - FutureUtils.result(dlsnFuture, Duration.apply(timeout, unit)); + Await.result(dlsnFuture, Duration.apply(timeout, unit)); // TODO: align the DLSN concepts with kafka concepts return new RecordMetadata(new TopicPartition(topic, 0), -1L, -1L); - } catch (DLInterruptedException e) { + } catch (InterruptedException e) { throw new InterruptedException("Interrupted on waiting for response"); - } catch (IOException e) { + } catch (Exception e) { throw new ExecutionException("Error on waiting for response", e); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java index 94a53d4..6fd017c 100644 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java +++ b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java @@ -19,13 +19,13 @@ package org.apache.distributedlog.mapreduce; import com.google.common.collect.Lists; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.LogSegmentMetadata; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeperAccessor; @@ -59,7 +59,7 @@ public class DistributedLogInputFormat protected Configuration conf; protected DistributedLogConfiguration dlConf; protected URI dlUri; - protected DistributedLogNamespace namespace; + protected Namespace namespace; protected String streamName; protected DistributedLogManager dlm; @@ -71,7 +71,7 @@ public class DistributedLogInputFormat dlUri = URI.create(configuration.get(DL_URI, "")); streamName = configuration.get(DL_STREAM, ""); try { - namespace = DistributedLogNamespaceBuilder.newBuilder() + namespace = NamespaceBuilder.newBuilder() .conf(dlConf) .uri(dlUri) .build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties b/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties index 56a6417..7aa93f6 100644 --- a/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties +++ b/distributedlog-tutorials/distributedlog-messaging/conf/log4j.properties @@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO log4j.logger.org.apache.bookkeeper=INFO # redirect executor output to executors.log since slow op warnings can be quite verbose -log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors -log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors -log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false -log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false log4j.appender.Executors=org.apache.log4j.RollingFileAppender http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java index ecf18fc..c59cc72 100644 --- a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java +++ b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/ReaderWithOffsets.java @@ -17,22 +17,27 @@ */ package org.apache.distributedlog.messaging; -import org.apache.distributedlog.*; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; -import org.iq80.leveldb.DB; -import org.iq80.leveldb.Options; +import static com.google.common.base.Charsets.UTF_8; +import static org.iq80.leveldb.impl.Iq80DBFactory.*; import java.io.File; import java.net.URI; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Charsets.UTF_8; -import static org.iq80.leveldb.impl.Iq80DBFactory.*; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; /** * Reader with offsets @@ -54,7 +59,7 @@ public class ReaderWithOffsets { URI uri = URI.create(dlUriStr); DistributedLogConfiguration conf = new DistributedLogConfiguration(); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -120,13 +125,13 @@ public class ReaderWithOffsets { System.out.println(new String(record.getPayload(), UTF_8)); System.out.println("\"\"\""); lastDLSN.set(record.getDlsn()); - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); } }; - reader.readNext().addEventListener(readListener); + reader.readNext().whenComplete(readListener); keepAliveLatch.await(); - FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS)); + FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java ---------------------------------------------------------------------- diff --git a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java index 2cf202f..3f874c0 100644 --- a/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java +++ b/distributedlog-tutorials/distributedlog-messaging/src/main/java/org/apache/distributedlog/messaging/StreamTransformer.java @@ -17,19 +17,7 @@ */ package org.apache.distributedlog.messaging; -import org.apache.distributedlog.*; -import org.apache.distributedlog.exceptions.LogEmptyException; -import org.apache.distributedlog.exceptions.LogNotFoundException; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.thrift.messaging.TransformedRecord; -import org.apache.distributedlog.util.FutureUtils; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TIOStreamTransport; +import static com.google.common.base.Charsets.UTF_8; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -37,8 +25,24 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Charsets.UTF_8; +import org.apache.distributedlog.api.AsyncLogReader; +import org.apache.distributedlog.api.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.exceptions.LogEmptyException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.distributedlog.thrift.messaging.TransformedRecord; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TIOStreamTransport; /** * Transform one stream to another stream. And apply transformation @@ -63,7 +67,7 @@ public class StreamTransformer { DistributedLogConfiguration conf = new DistributedLogConfiguration(); conf.setOutputBufferSize(16*1024); // 16KB conf.setPeriodicFlushFrequencyMilliSeconds(5); // 5ms - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -102,7 +106,7 @@ public class StreamTransformer { try { readLoop(srcDlm, srcDlsn, targetWriter, replicationTransformer); } finally { - FutureUtils.result(targetWriter.asyncClose(), Duration.apply(5, TimeUnit.SECONDS)); + FutureUtils.result(targetWriter.asyncClose(), 5, TimeUnit.SECONDS); targetDlm.close(); srcDlm.close(); namespace.close(); @@ -131,7 +135,7 @@ public class StreamTransformer { @Override public void onSuccess(LogRecordWithDLSN record) { if (record.getDlsn().compareTo(fromDLSN) <= 0) { - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); return; } System.out.println("Received record " + record.getDlsn()); @@ -146,13 +150,13 @@ public class StreamTransformer { e.printStackTrace(System.err); keepAliveLatch.countDown(); } - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); } }; - reader.readNext().addEventListener(readListener); + reader.readNext().whenComplete(readListener); keepAliveLatch.await(); - FutureUtils.result(reader.asyncClose(), Duration.apply(5, TimeUnit.SECONDS)); + FutureUtils.result(reader.asyncClose(), 5, TimeUnit.SECONDS); } private static void transform(final AsyncLogWriter writer, @@ -170,7 +174,7 @@ public class StreamTransformer { transformedRecord.write(protocolFactory.getProtocol(new TIOStreamTransport(baos))); byte[] data = baos.toByteArray(); writer.write(new LogRecord(record.getSequenceId(), data)) - .addEventListener(new FutureEventListener<DLSN>() { + .whenComplete(new FutureEventListener<DLSN>() { @Override public void onFailure(Throwable cause) { System.err.println("Encountered error on writing records to stream " + writer.getStreamName()); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a24577c..d8c74ac 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ </developers> <modules> <module>distributedlog-build-tools</module> + <module>distributedlog-common</module> <module>distributedlog-protocol</module> <module>distributedlog-core</module> <module>distributedlog-proxy-protocol</module> @@ -90,6 +91,7 @@ <module>distributedlog-proxy-server</module> <module>distributedlog-benchmark</module> <module>distributedlog-tutorials</module> + <module>distributedlog-core-twitter</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> @@ -103,11 +105,13 @@ <commons-lang3.version>3.3.2</commons-lang3.version> <curator.version>3.2.1</curator.version> <finagle.version>6.34.0</finagle.version> + <freebuilder.version>1.12.3</freebuilder.version> <guava.version>20.0</guava.version> <jetty.version>8.1.19.v20160209</jetty.version> <jmock.version>2.8.2</jmock.version> <junit.version>4.8.1</junit.version> <libthrift.version>0.5.0-1</libthrift.version> + <lombok.version>1.16.16</lombok.version> <lz4.version>1.2.0</lz4.version> <mockito.version>1.9.5</mockito.version> <scrooge.version>4.6.0</scrooge.version> @@ -142,7 +146,7 @@ <groups> <group> <title>Core Library</title> - <packages>org.apache.distributedlog:org.apache.distributedlog.annotations:org.apache.distributedlog.callback:org.apache.distributedlog.exceptions:org.apache.distributedlog.feature:org.apache.distributedlog.io:org.apache.distributedlog.lock:org.apache.distributedlog.logsegment:org.apache.distributedlog.metadata:org.apache.distributedlog.namespace:org.apache.distributedlog.net:org.apache.distributedlog.stats:org.apache.distributedlog.subscription</packages> + <packages>org.apache.distributedlog:org.apache.distributedlog.annotations:org.apache.distributedlog.callback:org.apache.distributedlog.exceptions:org.apache.distributedlog.feature:org.apache.distributedlog.io:org.apache.distributedlog.lock:org.apache.distributedlog.logsegment:org.apache.distributedlog.metadata:org.apache.distributedlog.namespace:org.apache.distributedlog.net:org.apache.distributedlog.stats:org.apache.distributedlog.api.subscription</packages> </group> <group> <title>Proxy Client</title> @@ -150,7 +154,7 @@ </group> </groups> <excludePackageNames> - org.apache.distributedlog.acl:org.apache.distributedlog.admin:org.apache.distributedlog.auditor:org.apache.distributedlog.basic:org.apache.distributedlog.benchmark*:org.apache.distributedlog.bk:org.apache.distributedlog.ownership:org.apache.distributedlog.proxy:org.apache.distributedlog.resolver:org.apache.distributedlog.service.*:org.apache.distributedlog.config:org.apache.distributedlog.function:org.apache.distributedlog.impl*:org.apache.distributedlog.injector:org.apache.distributedlog.kafka:org.apache.distributedlog.limiter:org.apache.distributedlog.mapreduce:org.apache.distributedlog.messaging:org.apache.distributedlog.rate:org.apache.distributedlog.readahead:org.apache.distributedlog.selector:org.apache.distributedlog.stats:org.apache.distributedlog.thrift*:org.apache.distributedlog.tools:org.apache.distributedlog.util:org.apache.distributedlog.zk:org.apache.bookkeeper.client:org.apache.bookkeeper.stats + org.apache.distributedlog.acl:org.apache.distributedlog.admin:org.apache.distributedlog.auditor:org.apache.distributedlog.basic:org.apache.distributedlog.benchmark*:org.apache.distributedlog.bk:org.apache.distributedlog.ownership:org.apache.distributedlog.proxy:org.apache.distributedlog.resolver:org.apache.distributedlog.service.*:org.apache.distributedlog.config:org.apache.distributedlog.function:org.apache.distributedlog.impl*:org.apache.distributedlog.injector:org.apache.distributedlog.kafka:org.apache.distributedlog.limiter:org.apache.distributedlog.mapreduce:org.apache.distributedlog.messaging:org.apache.distributedlog.common.rate:org.apache.distributedlog.readahead:org.apache.distributedlog.selector:org.apache.distributedlog.stats:org.apache.distributedlog.thrift*:org.apache.distributedlog.tools:org.apache.distributedlog.util:org.apache.distributedlog.zk:org.apache.bookkeeper.client:org.apache.bookkeeper.stats </excludePackageNames> </configuration> <executions> @@ -182,8 +186,8 @@ <artifactId>maven-compiler-plugin</artifactId> <version>${maven-compiler-plugin.version}</version> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>1.8</source> + <target>1.8</target> </configuration> </plugin> <plugin>