http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..f1da33c
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link SpeculativeRequestExecutionPolicy}.
+ */
+public class DefaultSpeculativeRequestExecutionPolicy implements 
SpeculativeRequestExecutionPolicy {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class);
+    final int firstSpeculativeRequestTimeout;
+    final int maxSpeculativeRequestTimeout;
+    final float backoffMultiplier;
+    int nextSpeculativeRequestTimeout;
+
+    public DefaultSpeculativeRequestExecutionPolicy(int 
firstSpeculativeRequestTimeout,
+                                                    int 
maxSpeculativeRequestTimeout,
+                                                    float backoffMultiplier) {
+        this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
+        this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout;
+        this.backoffMultiplier = backoffMultiplier;
+        this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
+
+        if (backoffMultiplier <= 0) {
+            throw new IllegalArgumentException("Invalid value provided for 
backoffMultiplier");
+        }
+
+        // Prevent potential over flow
+        if (Math.round((double) maxSpeculativeRequestTimeout * (double) 
backoffMultiplier) > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException("Invalid values for 
maxSpeculativeRequestTimeout and backoffMultiplier");
+        }
+    }
+
+    @VisibleForTesting
+    int getNextSpeculativeRequestTimeout() {
+        return nextSpeculativeRequestTimeout;
+    }
+
+    /**
+     * Initialize the speculative request execution policy.
+     *
+     * @param scheduler The scheduler service to issue the speculative request
+     * @param requestExecutor The executor is used to issue the actual 
speculative requests
+     */
+    @Override
+    public void initiateSpeculativeRequest(final ScheduledExecutorService 
scheduler,
+                                           final SpeculativeRequestExecutor 
requestExecutor) {
+        issueSpeculativeRequest(scheduler, requestExecutor);
+    }
+
+    private void issueSpeculativeRequest(final ScheduledExecutorService 
scheduler,
+                                         final SpeculativeRequestExecutor 
requestExecutor) {
+        Future<Boolean> issueNextRequest = 
requestExecutor.issueSpeculativeRequest();
+        issueNextRequest.addEventListener(new FutureEventListener<Boolean>() {
+            // we want this handler to run immediately after we push the big 
red button!
+            @Override
+            public void onSuccess(Boolean issueNextRequest) {
+                if (issueNextRequest) {
+                    scheduleSpeculativeRequest(scheduler, requestExecutor, 
nextSpeculativeRequestTimeout);
+                    nextSpeculativeRequestTimeout = 
Math.min(maxSpeculativeRequestTimeout,
+                            (int) (nextSpeculativeRequestTimeout * 
backoffMultiplier));
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Stopped issuing speculative requests for 
{}, "
+                            + "speculativeReadTimeout = {}", requestExecutor, 
nextSpeculativeRequestTimeout);
+                    }
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable thrown) {
+                LOG.warn("Failed to issue speculative request for {}, 
speculativeReadTimeout = {} : ",
+                        new Object[] { requestExecutor, 
nextSpeculativeRequestTimeout, thrown });
+            }
+        });
+    }
+
+    private void scheduleSpeculativeRequest(final ScheduledExecutorService 
scheduler,
+                                            final SpeculativeRequestExecutor 
requestExecutor,
+                                            final int 
speculativeRequestTimeout) {
+        try {
+            scheduler.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    issueSpeculativeRequest(scheduler, requestExecutor);
+                }
+            }, speculativeRequestTimeout, TimeUnit.MILLISECONDS);
+        } catch (RejectedExecutionException re) {
+            if (!scheduler.isShutdown()) {
+                LOG.warn("Failed to schedule speculative request for {}, 
speculativeReadTimeout = {} : ",
+                        new Object[]{requestExecutor, 
speculativeRequestTimeout, re});
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..faf45c2
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Speculative request execution policy.
+ */
+public interface SpeculativeRequestExecutionPolicy {
+    /**
+     * Initialize the speculative request execution policy and initiate 
requests.
+     *
+     * @param scheduler The scheduler service to issue the speculative request
+     * @param requestExecutor The executor is used to issue the actual 
speculative requests
+     */
+    void initiateSpeculativeRequest(ScheduledExecutorService scheduler,
+                                    SpeculativeRequestExecutor 
requestExecutor);
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
new file mode 100644
index 0000000..68fe8b0
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import com.twitter.util.Future;
+
+/**
+ * Executor to execute speculative requests.
+ */
+public interface SpeculativeRequestExecutor {
+
+    /**
+     * Issues a speculative request and indicates if more speculative requests 
should be issued.
+     *
+     * @return whether more speculative requests should be issued.
+     */
+    Future<Boolean> issueSpeculativeRequest();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
new file mode 100644
index 0000000..4bdd4b1
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Speculative Mechanism.
+ */
+package org.apache.distributedlog.client.speculative;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
new file mode 100644
index 0000000..c2dcddd
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Client Stats.
+ */
+public class ClientStats {
+
+    // Region Resolver
+    private final RegionResolver regionResolver;
+
+    // Stats
+    private final StatsReceiver statsReceiver;
+    private final ClientStatsLogger clientStatsLogger;
+    private final boolean enableRegionStats;
+    private final ConcurrentMap<String, ClientStatsLogger> 
regionClientStatsLoggers;
+    private final ConcurrentMap<String, OpStats> opStatsMap;
+
+    public ClientStats(StatsReceiver statsReceiver,
+                       boolean enableRegionStats,
+                       RegionResolver regionResolver) {
+        this.statsReceiver = statsReceiver;
+        this.clientStatsLogger = new ClientStatsLogger(statsReceiver);
+        this.enableRegionStats = enableRegionStats;
+        this.regionClientStatsLoggers = new ConcurrentHashMap<String, 
ClientStatsLogger>();
+        this.regionResolver = regionResolver;
+        this.opStatsMap = new ConcurrentHashMap<String, OpStats>();
+    }
+
+    public OpStats getOpStats(String op) {
+        OpStats opStats = opStatsMap.get(op);
+        if (null != opStats) {
+            return opStats;
+        }
+        OpStats newStats = new OpStats(statsReceiver.scope(op),
+                enableRegionStats, regionResolver);
+        OpStats oldStats = opStatsMap.putIfAbsent(op, newStats);
+        if (null == oldStats) {
+            return newStats;
+        } else {
+            return oldStats;
+        }
+    }
+
+    private ClientStatsLogger getRegionClientStatsLogger(SocketAddress 
address) {
+        String region = regionResolver.resolveRegion(address);
+        return getRegionClientStatsLogger(region);
+    }
+
+    private ClientStatsLogger getRegionClientStatsLogger(String region) {
+        ClientStatsLogger statsLogger = regionClientStatsLoggers.get(region);
+        if (null == statsLogger) {
+            ClientStatsLogger newStatsLogger = new 
ClientStatsLogger(statsReceiver.scope(region));
+            ClientStatsLogger oldStatsLogger = 
regionClientStatsLoggers.putIfAbsent(region, newStatsLogger);
+            if (null == oldStatsLogger) {
+                statsLogger = newStatsLogger;
+            } else {
+                statsLogger = oldStatsLogger;
+            }
+        }
+        return statsLogger;
+    }
+
+    public StatsReceiver getFinagleStatsReceiver(SocketAddress addr) {
+        if (enableRegionStats && null != addr) {
+            return getRegionClientStatsLogger(addr).getStatsReceiver();
+        } else {
+            return clientStatsLogger.getStatsReceiver();
+        }
+    }
+
+    public void completeProxyRequest(SocketAddress addr, StatusCode code, long 
startTimeNanos) {
+        clientStatsLogger.completeProxyRequest(code, startTimeNanos);
+        if (enableRegionStats && null != addr) {
+            getRegionClientStatsLogger(addr).completeProxyRequest(code, 
startTimeNanos);
+        }
+    }
+
+    public void failProxyRequest(SocketAddress addr, Throwable cause, long 
startTimeNanos) {
+        clientStatsLogger.failProxyRequest(cause, startTimeNanos);
+        if (enableRegionStats && null != addr) {
+            getRegionClientStatsLogger(addr).failProxyRequest(cause, 
startTimeNanos);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
new file mode 100644
index 0000000..530c632
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.stats.Counter;
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stats Logger to collect client stats.
+ */
+public class ClientStatsLogger {
+
+    // Stats
+    private final StatsReceiver statsReceiver;
+    private final StatsReceiver responseStatsReceiver;
+    private final ConcurrentMap<StatusCode, Counter> responseStats =
+            new ConcurrentHashMap<StatusCode, Counter>();
+    private final StatsReceiver exceptionStatsReceiver;
+    private final ConcurrentMap<Class<?>, Counter> exceptionStats =
+            new ConcurrentHashMap<Class<?>, Counter>();
+
+    private final Stat proxySuccessLatencyStat;
+    private final Stat proxyFailureLatencyStat;
+
+    public ClientStatsLogger(StatsReceiver statsReceiver) {
+        this.statsReceiver = statsReceiver;
+        responseStatsReceiver = statsReceiver.scope("responses");
+        exceptionStatsReceiver = statsReceiver.scope("exceptions");
+        StatsReceiver proxyLatencyStatReceiver = 
statsReceiver.scope("proxy_request_latency");
+        proxySuccessLatencyStat = proxyLatencyStatReceiver.stat0("success");
+        proxyFailureLatencyStat = proxyLatencyStatReceiver.stat0("failure");
+    }
+
+    public StatsReceiver getStatsReceiver() {
+        return statsReceiver;
+    }
+
+    private Counter getResponseCounter(StatusCode code) {
+        Counter counter = responseStats.get(code);
+        if (null == counter) {
+            Counter newCounter = responseStatsReceiver.counter0(code.name());
+            Counter oldCounter = responseStats.putIfAbsent(code, newCounter);
+            counter = null != oldCounter ? oldCounter : newCounter;
+        }
+        return counter;
+    }
+
+    private Counter getExceptionCounter(Class<?> cls) {
+        Counter counter = exceptionStats.get(cls);
+        if (null == counter) {
+            Counter newCounter = 
exceptionStatsReceiver.counter0(cls.getName());
+            Counter oldCounter = exceptionStats.putIfAbsent(cls, newCounter);
+            counter = null != oldCounter ? oldCounter : newCounter;
+        }
+        return counter;
+    }
+
+    public void completeProxyRequest(StatusCode code, long startTimeNanos) {
+        getResponseCounter(code).incr();
+        proxySuccessLatencyStat.add(elapsedMicroSec(startTimeNanos));
+    }
+
+    public void failProxyRequest(Throwable cause, long startTimeNanos) {
+        getExceptionCounter(cause.getClass()).incr();
+        proxyFailureLatencyStat.add(elapsedMicroSec(startTimeNanos));
+    }
+
+    static long elapsedMicroSec(long startNanoTime) {
+        return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - 
startNanoTime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
new file mode 100644
index 0000000..7a49faa
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Op Stats.
+ */
+public class OpStats {
+
+    // Region Resolver
+    private final RegionResolver regionResolver;
+
+    // Stats
+    private final StatsReceiver statsReceiver;
+    private final OpStatsLogger opStatsLogger;
+    private final boolean enableRegionStats;
+    private final ConcurrentMap<String, OpStatsLogger> regionOpStatsLoggers;
+
+    public OpStats(StatsReceiver statsReceiver,
+                   boolean enableRegionStats,
+                   RegionResolver regionResolver) {
+        this.statsReceiver = statsReceiver;
+        this.opStatsLogger = new OpStatsLogger(statsReceiver);
+        this.enableRegionStats = enableRegionStats;
+        this.regionOpStatsLoggers = new ConcurrentHashMap<String, 
OpStatsLogger>();
+        this.regionResolver = regionResolver;
+    }
+
+    private OpStatsLogger getRegionOpStatsLogger(SocketAddress address) {
+        String region = regionResolver.resolveRegion(address);
+        return getRegionOpStatsLogger(region);
+    }
+
+    private OpStatsLogger getRegionOpStatsLogger(String region) {
+        OpStatsLogger statsLogger = regionOpStatsLoggers.get(region);
+        if (null == statsLogger) {
+            OpStatsLogger newStatsLogger = new 
OpStatsLogger(statsReceiver.scope(region));
+            OpStatsLogger oldStatsLogger = 
regionOpStatsLoggers.putIfAbsent(region, newStatsLogger);
+            if (null == oldStatsLogger) {
+                statsLogger = newStatsLogger;
+            } else {
+                statsLogger = oldStatsLogger;
+            }
+        }
+        return statsLogger;
+    }
+
+    public void completeRequest(SocketAddress addr, long micros, int numTries) 
{
+        opStatsLogger.completeRequest(micros, numTries);
+        if (enableRegionStats && null != addr) {
+            getRegionOpStatsLogger(addr).completeRequest(micros, numTries);
+        }
+    }
+
+    public void failRequest(SocketAddress addr, long micros, int numTries) {
+        opStatsLogger.failRequest(micros, numTries);
+        if (enableRegionStats && null != addr) {
+            getRegionOpStatsLogger(addr).failRequest(micros, numTries);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
new file mode 100644
index 0000000..b94b4be
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import com.twitter.finagle.stats.Stat;
+import com.twitter.finagle.stats.StatsReceiver;
+
+/**
+ * Stats Logger per operation type.
+ */
+public class OpStatsLogger {
+
+    private final Stat successLatencyStat;
+    private final Stat failureLatencyStat;
+    private final Stat redirectStat;
+
+    public OpStatsLogger(StatsReceiver statsReceiver) {
+        StatsReceiver latencyStatReceiver = statsReceiver.scope("latency");
+        successLatencyStat = latencyStatReceiver.stat0("success");
+        failureLatencyStat = latencyStatReceiver.stat0("failure");
+        StatsReceiver redirectStatReceiver = statsReceiver.scope("redirects");
+        redirectStat = redirectStatReceiver.stat0("times");
+    }
+
+    public void completeRequest(long micros, int numTries) {
+        successLatencyStat.add(micros);
+        redirectStat.add(numTries);
+    }
+
+    public void failRequest(long micros, int numTries) {
+        failureLatencyStat.add(micros);
+        redirectStat.add(numTries);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
new file mode 100644
index 0000000..110e99a
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.stats;
+
+import com.twitter.finagle.stats.Counter;
+import com.twitter.finagle.stats.StatsReceiver;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Stats Logger for ownerships.
+ */
+public class OwnershipStatsLogger {
+
+    /**
+     * Ownership related stats.
+     */
+    public static class OwnershipStat {
+        private final Counter hits;
+        private final Counter misses;
+        private final Counter removes;
+        private final Counter redirects;
+        private final Counter adds;
+
+        OwnershipStat(StatsReceiver ownershipStats) {
+            hits = ownershipStats.counter0("hits");
+            misses = ownershipStats.counter0("misses");
+            adds = ownershipStats.counter0("adds");
+            removes = ownershipStats.counter0("removes");
+            redirects = ownershipStats.counter0("redirects");
+        }
+
+        public void onHit() {
+            hits.incr();
+        }
+
+        public void onMiss() {
+            misses.incr();
+        }
+
+        public void onAdd() {
+            adds.incr();
+        }
+
+        public void onRemove() {
+            removes.incr();
+        }
+
+        public void onRedirect() {
+            redirects.incr();
+        }
+
+    }
+
+    private final OwnershipStat ownershipStat;
+    private final StatsReceiver ownershipStatsReceiver;
+    private final ConcurrentMap<String, OwnershipStat> ownershipStats =
+            new ConcurrentHashMap<String, OwnershipStat>();
+
+    public OwnershipStatsLogger(StatsReceiver statsReceiver,
+                                StatsReceiver streamStatsReceiver) {
+        this.ownershipStat = new 
OwnershipStat(statsReceiver.scope("ownership"));
+        this.ownershipStatsReceiver = 
streamStatsReceiver.scope("perstream_ownership");
+    }
+
+    private OwnershipStat getOwnershipStat(String stream) {
+        OwnershipStat stat = ownershipStats.get(stream);
+        if (null == stat) {
+            OwnershipStat newStat = new 
OwnershipStat(ownershipStatsReceiver.scope(stream));
+            OwnershipStat oldStat = ownershipStats.putIfAbsent(stream, 
newStat);
+            stat = null != oldStat ? oldStat : newStat;
+        }
+        return stat;
+    }
+
+    public void onMiss(String stream) {
+        ownershipStat.onMiss();
+        getOwnershipStat(stream).onMiss();
+    }
+
+    public void onHit(String stream) {
+        ownershipStat.onHit();
+        getOwnershipStat(stream).onHit();
+    }
+
+    public void onRedirect(String stream) {
+        ownershipStat.onRedirect();
+        getOwnershipStat(stream).onRedirect();
+    }
+
+    public void onRemove(String stream) {
+        ownershipStat.onRemove();
+        getOwnershipStat(stream).onRemove();
+    }
+
+    public void onAdd(String stream) {
+        ownershipStat.onAdd();
+        getOwnershipStat(stream).onAdd();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
new file mode 100644
index 0000000..106d3fc
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Client side stats utils.
+ */
+package org.apache.distributedlog.client.stats;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
new file mode 100644
index 0000000..68e6825
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Socket Address identifier for a DL proxy.
+ */
+public class DLSocketAddress {
+
+    private static final int VERSION = 1;
+
+    private static final String COLON = ":";
+    private static final String SEP = ";";
+
+    private final int shard;
+    private final InetSocketAddress socketAddress;
+
+    public DLSocketAddress(int shard, InetSocketAddress socketAddress) {
+        this.shard = shard;
+        this.socketAddress = socketAddress;
+    }
+
+    /**
+     * Shard id for dl write proxy.
+     *
+     * @return shard id for dl write proxy.
+     */
+    public int getShard() {
+        return shard;
+    }
+
+    /**
+     * Socket address for dl write proxy.
+     *
+     * @return socket address for dl write proxy
+     */
+    public InetSocketAddress getSocketAddress() {
+        return socketAddress;
+    }
+
+    /**
+     * Serialize the write proxy identifier to string.
+     *
+     * @return serialized write proxy identifier.
+     */
+    public String serialize() {
+        return toLockId(socketAddress, shard);
+    }
+
+    @Override
+    public int hashCode() {
+        return socketAddress.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof DLSocketAddress)) {
+            return false;
+        }
+        DLSocketAddress other = (DLSocketAddress) obj;
+        return shard == other.shard && 
socketAddress.equals(other.socketAddress);
+    }
+
+    @Override
+    public String toString() {
+        return toLockId(socketAddress, shard);
+    }
+
+    /**
+     * Deserialize proxy address from a string representation.
+     *
+     * @param lockId
+     *          string representation of the proxy address.
+     * @return proxy address.
+     * @throws IOException
+     */
+    public static DLSocketAddress deserialize(String lockId) throws 
IOException {
+        String parts[] = lockId.split(SEP);
+        if (3 != parts.length) {
+            throw new IOException("Invalid dl socket address " + lockId);
+        }
+        int version;
+        try {
+            version = Integer.parseInt(parts[0]);
+        } catch (NumberFormatException nfe) {
+            throw new IOException("Invalid version found in " + lockId, nfe);
+        }
+        if (VERSION != version) {
+            throw new IOException("Invalid version " + version + " found in " 
+ lockId + ", expected " + VERSION);
+        }
+        int shardId;
+        try {
+            shardId = Integer.parseInt(parts[1]);
+        } catch (NumberFormatException nfe) {
+            throw new IOException("Invalid shard id found in " + lockId, nfe);
+        }
+        InetSocketAddress address = parseSocketAddress(parts[2]);
+        return new DLSocketAddress(shardId, address);
+    }
+
+    /**
+     * Parse the inet socket address from the string representation.
+     *
+     * @param addr
+     *          string representation
+     * @return inet socket address
+     */
+    public static InetSocketAddress parseSocketAddress(String addr) {
+        String[] parts =  addr.split(COLON);
+        checkArgument(parts.length == 2);
+        String hostname = parts[0];
+        int port = Integer.parseInt(parts[1]);
+        return new InetSocketAddress(hostname, port);
+    }
+
+    public static InetSocketAddress getSocketAddress(int port) throws 
UnknownHostException {
+        return new 
InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
+    }
+
+    /**
+     * Convert inet socket address to the string representation.
+     *
+     * @param address
+     *          inet socket address.
+     * @return string representation of inet socket address.
+     */
+    public static String toString(InetSocketAddress address) {
+        StringBuilder sb = new StringBuilder();
+        
sb.append(address.getHostName()).append(COLON).append(address.getPort());
+        return sb.toString();
+    }
+
+    public static String toLockId(InetSocketAddress address, int shard) {
+        StringBuilder sb = new StringBuilder();
+        
sb.append(VERSION).append(SEP).append(shard).append(SEP).append(toString(address));
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
new file mode 100644
index 0000000..9f30815
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import com.twitter.util.Future;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Interface for distributedlog client.
+ */
+public interface DistributedLogClient {
+    /**
+     * Write <i>data</i> to a given <i>stream</i>.
+     *
+     * @param stream
+     *          Stream Name.
+     * @param data
+     *          Data to write.
+     * @return a future representing a sequence id returned for this write.
+     */
+    Future<DLSN> write(String stream, ByteBuffer data);
+
+    /**
+     * Write record set to a given <i>stream</i>.
+     *
+     * <p>The record set is built from {@link 
org.apache.distributedlog.LogRecordSet.Writer}
+     *
+     * @param stream stream to write to
+     * @param recordSet record set
+     */
+    Future<DLSN> writeRecordSet(String stream, LogRecordSetBuffer recordSet);
+
+    /**
+     * Write <i>data</i> in bulk to a given <i>stream</i>.
+     *
+     * <p>Return a list of Future dlsns, one for each submitted buffer. In the 
event of a partial
+     * failure--ex. some specific buffer write fails, all subsequent writes
+     * will also fail.
+     *
+     * @param stream
+     *          Stream Name.
+     * @param data
+     *          Data to write.
+     * @return a list of futures, one for each submitted buffer.
+     */
+    List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data);
+
+    /**
+     * Truncate the stream to a given <i>dlsn</i>.
+     *
+     * @param stream
+     *          Stream Name.
+     * @param dlsn
+     *          DLSN to truncate until.
+     * @return a future representing the truncation.
+     */
+    Future<Boolean> truncate(String stream, DLSN dlsn);
+
+    /**
+     * Release the ownership of a stream <i>stream</i>.
+     *
+     * @param stream
+     *          Stream Name to release.
+     * @return a future representing the release operation.
+     */
+    Future<Void> release(String stream);
+
+    /**
+     * Delete a given stream <i>stream</i>.
+     *
+     * @param stream
+     *          Stream Name to delete.
+     * @return a future representing the delete operation.
+     */
+    Future<Void> delete(String stream);
+
+    /**
+     * Create a stream with name <i>stream</i>.
+     *
+     * @param stream
+     *          Stream Name to create.
+     * @return a future representing the create operation.
+     */
+    Future<Void> create(String stream);
+
+    /**
+     * Close the client.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
new file mode 100644
index 0000000..0e2a152
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.twitter.common.zookeeper.ServerSet;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.proxy.ClusterClient;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.client.routing.RegionsRoutingService;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.client.routing.RoutingUtils;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import com.twitter.finagle.Name;
+import com.twitter.finagle.Resolver$;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.ThriftMux;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.finagle.thrift.ThriftClientFramedCodec;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Duration;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Random;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+/**
+ * Builder to build {@link DistributedLogClient}.
+ */
+public final class DistributedLogClientBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DistributedLogClientBuilder.class);
+
+    private static final Random random = new 
Random(System.currentTimeMillis());
+
+    private String name = null;
+    private ClientId clientId = null;
+    private RoutingService.Builder routingServiceBuilder = null;
+    private ClientBuilder clientBuilder = null;
+    private String serverRoutingServiceFinagleName = null;
+    private StatsReceiver statsReceiver = new NullStatsReceiver();
+    private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
+    private ClientConfig clientConfig = new ClientConfig();
+    private boolean enableRegionStats = false;
+    private final RegionResolver regionResolver = new DefaultRegionResolver();
+
+    /**
+     * Create a client builder.
+     *
+     * @return client builder
+     */
+    public static DistributedLogClientBuilder newBuilder() {
+        return new DistributedLogClientBuilder();
+    }
+
+    /**
+     * Create a new client builder from an existing {@code builder}.
+     *
+     * @param builder the existing builder.
+     * @return a new client builder.
+     */
+    public static DistributedLogClientBuilder 
newBuilder(DistributedLogClientBuilder builder) {
+        DistributedLogClientBuilder newBuilder = new 
DistributedLogClientBuilder();
+        newBuilder.name = builder.name;
+        newBuilder.clientId = builder.clientId;
+        newBuilder.clientBuilder = builder.clientBuilder;
+        newBuilder.routingServiceBuilder = builder.routingServiceBuilder;
+        newBuilder.statsReceiver = builder.statsReceiver;
+        newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
+        newBuilder.enableRegionStats = builder.enableRegionStats;
+        newBuilder.serverRoutingServiceFinagleName = 
builder.serverRoutingServiceFinagleName;
+        newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
+        return newBuilder;
+    }
+
+    // private constructor
+    private DistributedLogClientBuilder() {}
+
+    /**
+     * Client Name.
+     *
+     * @param name
+     *          client name
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder name(String name) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.name = name;
+        return newBuilder;
+    }
+
+    /**
+     * Client ID.
+     *
+     * @param clientId
+     *          client id
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder clientId(ClientId clientId) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientId = clientId;
+        return newBuilder;
+    }
+
+    /**
+     * Serverset to access proxy services.
+     *
+     * @param serverSet
+     *          server set.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder serverSet(ServerSet serverSet) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.routingServiceBuilder = 
RoutingUtils.buildRoutingService(serverSet);
+        newBuilder.enableRegionStats = false;
+        return newBuilder;
+    }
+
+    /**
+     * Server Sets to access proxy services.
+     *
+     * <p>The <i>local</i> server set will be tried first then <i>remotes</i>.
+     *
+     * @param local local server set.
+     * @param remotes remote server sets.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder serverSets(ServerSet local, 
ServerSet...remotes) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        RoutingService.Builder[] builders = new 
RoutingService.Builder[remotes.length + 1];
+        builders[0] = RoutingUtils.buildRoutingService(local);
+        for (int i = 1; i < builders.length; i++) {
+            builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
+        }
+        newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
+                .resolver(regionResolver)
+                .routingServiceBuilders(builders);
+        newBuilder.enableRegionStats = remotes.length > 0;
+        return newBuilder;
+    }
+
+    /**
+     * Name to access proxy services.
+     *
+     * @param finagleNameStr
+     *          finagle name string.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.routingServiceBuilder = 
RoutingUtils.buildRoutingService(finagleNameStr);
+        newBuilder.enableRegionStats = false;
+        return newBuilder;
+    }
+
+    /**
+     * Finagle name strs to access proxy services.
+     *
+     * <p>The <i>local</i> finalge name str will be tried first, then 
<i>remotes</i>.
+     *
+     * @param local local server set.
+     * @param remotes remote server sets.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder finagleNameStrs(String local, 
String...remotes) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        RoutingService.Builder[] builders = new 
RoutingService.Builder[remotes.length + 1];
+        builders[0] = RoutingUtils.buildRoutingService(local);
+        for (int i = 1; i < builders.length; i++) {
+            builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
+        }
+        newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
+                .routingServiceBuilders(builders)
+                .resolver(regionResolver);
+        newBuilder.enableRegionStats = remotes.length > 0;
+        return newBuilder;
+    }
+
+    /**
+     * URI to access proxy services.
+     *
+     * <p>Assuming the write proxies are announced under `.write_proxy` of the 
provided namespace uri.
+     * The builder will convert the dl uri (e.g. 
distributedlog://{zkserver}/path/to/namespace) to
+     * zookeeper serverset based finagle name str 
(`zk!{zkserver}!/path/to/namespace/.write_proxy`)
+     *
+     * @param uri namespace uri to access the serverset of write proxies
+     * @return distributedlog builder
+     */
+    public DistributedLogClientBuilder uri(URI uri) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        String zkServers = uri.getAuthority().replace(";", ",");
+        String[] zkServerList = StringUtils.split(zkServers, ',');
+        String finagleNameStr = String.format(
+                "zk!%s!%s/.write_proxy",
+                zkServerList[random.nextInt(zkServerList.length)], // zk server
+                uri.getPath());
+        newBuilder.routingServiceBuilder = 
RoutingUtils.buildRoutingService(finagleNameStr);
+        newBuilder.enableRegionStats = false;
+        return newBuilder;
+    }
+
+    /**
+     * Address of write proxy to connect.
+     *
+     * @param address
+     *          write proxy address.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder host(SocketAddress address) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.routingServiceBuilder = 
RoutingUtils.buildRoutingService(address);
+        newBuilder.enableRegionStats = false;
+        return newBuilder;
+    }
+
+    private DistributedLogClientBuilder 
routingServiceBuilder(RoutingService.Builder builder) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.routingServiceBuilder = builder;
+        newBuilder.enableRegionStats = false;
+        return newBuilder;
+    }
+
+    /**
+     * Routing Service to access proxy services.
+     *
+     * @param routingService
+     *          routing service
+     * @return client builder.
+     */
+    @VisibleForTesting
+    public DistributedLogClientBuilder routingService(RoutingService 
routingService) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.routingServiceBuilder = 
RoutingUtils.buildRoutingService(routingService);
+        newBuilder.enableRegionStats = false;
+        return newBuilder;
+    }
+
+    /**
+     * Stats receiver to expose client stats.
+     *
+     * @param statsReceiver
+     *          stats receiver.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder statsReceiver(StatsReceiver 
statsReceiver) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.statsReceiver = statsReceiver;
+        return newBuilder;
+    }
+
+    /**
+     * Stream Stats Receiver to expose per stream stats.
+     *
+     * @param streamStatsReceiver
+     *          stream stats receiver
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver 
streamStatsReceiver) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.streamStatsReceiver = streamStatsReceiver;
+        return newBuilder;
+    }
+
+    /**
+     * Set underlying finagle client builder.
+     *
+     * @param builder
+     *          finagle client builder.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientBuilder = builder;
+        return newBuilder;
+    }
+
+    /**
+     * Backoff time when redirecting to an already retried host.
+     *
+     * @param ms
+     *          backoff time.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder redirectBackoffStartMs(int ms) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setRedirectBackoffStartMs(ms);
+        return newBuilder;
+    }
+
+    /**
+     * Max backoff time when redirecting to an already retried host.
+     *
+     * @param ms
+     *          backoff time.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setRedirectBackoffMaxMs(ms);
+        return newBuilder;
+    }
+
+    /**
+     * Max redirects that is allowed per request.
+     *
+     * <p>If <i>redirects</i> are exhausted, fail the request immediately.
+     *
+     * @param redirects
+     *          max redirects allowed before failing a request.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder maxRedirects(int redirects) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setMaxRedirects(redirects);
+        return newBuilder;
+    }
+
+    /**
+     * Timeout per request in millis.
+     *
+     * @param timeoutMs
+     *          timeout per request in millis.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs);
+        return newBuilder;
+    }
+
+    /**
+     * Set thriftmux enabled.
+     *
+     * @param enabled
+     *          is thriftmux enabled
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder thriftmux(boolean enabled) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setThriftMux(enabled);
+        return newBuilder;
+    }
+
+    /**
+     * Set failfast stream exception handling enabled.
+     *
+     * @param enabled
+     *          is failfast exception handling enabled
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder streamFailfast(boolean enabled) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setStreamFailfast(enabled);
+        return newBuilder;
+    }
+
+    /**
+     * Set the regex to match stream names that the client cares about.
+     *
+     * @param nameRegex
+     *          stream name regex
+     * @return client builder
+     */
+    public DistributedLogClientBuilder streamNameRegex(String nameRegex) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setStreamNameRegex(nameRegex);
+        return newBuilder;
+    }
+
+    /**
+     * Whether to use the new handshake endpoint to exchange ownership cache.
+     *
+     * <p>Enable this when the servers are updated to support handshaking with 
client info.
+     *
+     * @param enabled
+     *          new handshake endpoint is enabled.
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder handshakeWithClientInfo(boolean 
enabled) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setHandshakeWithClientInfo(enabled);
+        return newBuilder;
+    }
+
+    /**
+     * Set the periodic handshake interval in milliseconds.
+     *
+     * <p>Every <code>intervalMs</code>, the DL client will handshake with 
existing proxies again.
+     * If the interval is less than ownership sync interval, the handshake 
won't sync ownerships. Otherwise, it will.
+     *
+     * @see #periodicOwnershipSyncIntervalMs(long)
+     * @param intervalMs
+     *          handshake interval
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder periodicHandshakeIntervalMs(long 
intervalMs) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs);
+        return newBuilder;
+    }
+
+    /**
+     * Set the periodic ownership sync interval in milliseconds.
+     *
+     * <p>If periodic handshake is enabled, the handshake will sync ownership 
if the elapsed time is larger than
+     * sync interval.
+     *
+     * @see #periodicHandshakeIntervalMs(long)
+     * @param intervalMs
+     *          interval that handshake should sync ownerships.
+     * @return client builder
+     */
+    public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long 
intervalMs) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs);
+        return newBuilder;
+    }
+
+    /**
+     * Enable/Disable periodic dumping ownership cache.
+     *
+     * @param enabled
+     *          flag to enable/disable periodic dumping ownership cache
+     * @return client builder.
+     */
+    public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean 
enabled) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled);
+        return newBuilder;
+    }
+
+    /**
+     * Set periodic dumping ownership cache interval.
+     *
+     * @param intervalMs
+     *          interval on dumping ownership cache, in millis.
+     * @return client builder
+     */
+    public DistributedLogClientBuilder 
periodicDumpOwnershipCacheIntervalMs(long intervalMs) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        
newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs);
+        return newBuilder;
+    }
+
+    /**
+     * Enable handshake tracing.
+     *
+     * @param enabled
+     *          flag to enable/disable handshake tracing
+     * @return client builder
+     */
+    public DistributedLogClientBuilder handshakeTracing(boolean enabled) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setHandshakeTracingEnabled(enabled);
+        return newBuilder;
+    }
+
+    /**
+     * Enable checksum on requests to the proxy.
+     *
+     * @param enabled
+     *          flag to enable/disable checksum
+     * @return client builder
+     */
+    public DistributedLogClientBuilder checksum(boolean enabled) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig.setChecksumEnabled(enabled);
+        return newBuilder;
+    }
+
+    /**
+     * Configure the finagle name string for the server-side routing service.
+     *
+     * @param nameStr name string of the server-side routing service
+     * @return client builder
+     */
+    public DistributedLogClientBuilder 
serverRoutingServiceFinagleNameStr(String nameStr) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.serverRoutingServiceFinagleName = nameStr;
+        return newBuilder;
+    }
+
+    DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        newBuilder.clientConfig = ClientConfig.newConfig(clientConfig);
+        return newBuilder;
+    }
+
+    /**
+     * Build distributedlog client.
+     *
+     * @return distributedlog client.
+     */
+    public DistributedLogClient build() {
+        return buildClient();
+    }
+
+    /**
+     * Build monitor service client.
+     *
+     * @return monitor service client.
+     */
+    public MonitorServiceClient buildMonitorClient() {
+
+        return buildClient();
+    }
+
+    @SuppressWarnings("unchecked")
+    ClusterClient buildServerRoutingServiceClient(String 
serverRoutingServiceFinagleName) {
+        ClientBuilder builder = this.clientBuilder;
+        if (null == builder) {
+            builder = ClientBuilder.get()
+                    .tcpConnectTimeout(Duration.fromMilliseconds(200))
+                    .connectTimeout(Duration.fromMilliseconds(200))
+                    .requestTimeout(Duration.fromSeconds(1))
+                    .retries(20);
+            if (!clientConfig.getThriftMux()) {
+                builder = builder.hostConnectionLimit(1);
+            }
+        }
+        if (clientConfig.getThriftMux()) {
+            builder = builder.stack(ThriftMux.client().withClientId(clientId));
+        } else {
+            builder = 
builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
+        }
+
+        Name name;
+        try {
+            name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName);
+        } catch (Exception exc) {
+            logger.error("Exception in Resolver.eval for name {}", 
serverRoutingServiceFinagleName, exc);
+            throw new RuntimeException(exc);
+        }
+
+        // builder the client
+        Service<ThriftClientRequest, byte[]> client =
+                ClientBuilder.safeBuildFactory(
+                        
builder.dest(name).reportTo(statsReceiver.scope("routing"))
+                ).toService();
+        DistributedLogService.ServiceIface service =
+                new DistributedLogService.ServiceToClient(client, new 
TBinaryProtocol.Factory());
+        return new ClusterClient(client, service);
+    }
+
+    DistributedLogClientImpl buildClient() {
+        checkNotNull(name, "No name provided.");
+        checkNotNull(clientId, "No client id provided.");
+        checkNotNull(routingServiceBuilder, "No routing service builder 
provided.");
+        checkNotNull(statsReceiver, "No stats receiver provided.");
+        if (null == streamStatsReceiver) {
+            streamStatsReceiver = new NullStatsReceiver();
+        }
+
+        Optional<ClusterClient> serverRoutingServiceClient = Optional.absent();
+        if (null != serverRoutingServiceFinagleName) {
+            serverRoutingServiceClient = Optional.of(
+                    
buildServerRoutingServiceClient(serverRoutingServiceFinagleName));
+        }
+
+        RoutingService routingService = routingServiceBuilder
+                .statsReceiver(statsReceiver.scope("routing"))
+                .build();
+        DistributedLogClientImpl clientImpl =
+                new DistributedLogClientImpl(
+                        name,
+                        clientId,
+                        routingService,
+                        clientBuilder,
+                        clientConfig,
+                        serverRoutingServiceClient,
+                        statsReceiver,
+                        streamStatsReceiver,
+                        regionResolver,
+                        enableRegionStats);
+        routingService.startService();
+        clientImpl.handshake();
+        return clientImpl;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/package-info.java
 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/package-info.java
new file mode 100644
index 0000000..033882f
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/service/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Service Client.
+ */
+package org.apache.distributedlog.service;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/main/resources/findbugsExclude.xml 
b/distributedlog-proxy-client/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000..05ee085
--- /dev/null
+++ b/distributedlog-proxy-client/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,23 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+//-->
+<FindBugsFilter>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.thrift.*" />
+  </Match>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
 
b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
new file mode 100644
index 0000000..d7494de
--- /dev/null
+++ 
b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.LogRecordSet;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.service.DistributedLogClient;
+import com.twitter.finagle.IndividualRequestTimeoutException;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test {@link DistributedLogMultiStreamWriter}.
+ */
+public class TestDistributedLogMultiStreamWriter {
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithNullStreams() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithEmptyStreamList() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.<String>newArrayList())
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = NullPointerException.class)
+    public void testBuildWithNullClient() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = NullPointerException.class)
+    public void testBuildWithNullCodec() throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(null)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings1()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(-1)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings2()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(5)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings3()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(-1)
+                .build();
+    }
+
+    @Test(timeout = 20000, expected = IllegalArgumentException.class)
+    public void testBuildWithInvalidSpeculativeSettings4()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(10)
+                .build();
+    }
+
+    @Test(timeout = 20000)
+    public void testBuildMultiStreamWriter()
+            throws Exception {
+        DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(50)
+                .build();
+        assertTrue(true);
+    }
+
+    @Test(timeout = 20000)
+    public void testBuildWithPeriodicalFlushEnabled() throws Exception {
+        ScheduledExecutorService executorService = 
mock(ScheduledExecutorService.class);
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(50)
+                .flushIntervalMs(1000)
+                .scheduler(executorService)
+                .build();
+        verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 
1000000, TimeUnit.MICROSECONDS);
+    }
+
+    @Test(timeout = 20000)
+    public void testBuildWithPeriodicalFlushDisabled() throws Exception {
+        ScheduledExecutorService executorService = 
mock(ScheduledExecutorService.class);
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(mock(DistributedLogClient.class))
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(50)
+                .flushIntervalMs(0)
+                .scheduler(executorService)
+                .build();
+        verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 
1000, TimeUnit.MILLISECONDS);
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testFlushWhenBufferIsFull() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
+                .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
+
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(100000)
+                .maxSpeculativeTimeoutMs(200000)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(500000)
+                .flushIntervalMs(0)
+                .bufferSize(0)
+                .scheduler(executorService)
+                .build();
+
+        ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8));
+        writer.write(buffer);
+
+        verify(client, times(1)).writeRecordSet((String) any(), 
(LogRecordSetBuffer) any());
+
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testFlushWhenExceedMaxLogRecordSetSize()
+            throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
+                .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
+        ScheduledExecutorService executorService =
+                Executors.newSingleThreadScheduledExecutor();
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(100000)
+                .maxSpeculativeTimeoutMs(200000)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(500000)
+                .flushIntervalMs(0)
+                .bufferSize(Integer.MAX_VALUE)
+                .scheduler(executorService)
+                .build();
+
+        byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100];
+        ByteBuffer buffer1 = ByteBuffer.wrap(data);
+        writer.write(buffer1);
+        verify(client, times(0)).writeRecordSet((String) any(), 
(LogRecordSetBuffer) any());
+        LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter();
+        assertEquals(1, recordSetWriter1.getNumRecords());
+        assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, 
recordSetWriter1.getNumBytes());
+
+        ByteBuffer buffer2 = ByteBuffer.wrap(data);
+        writer.write(buffer2);
+        verify(client, times(1)).writeRecordSet((String) any(), 
(LogRecordSetBuffer) any());
+        LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter();
+        assertEquals(1, recordSetWriter2.getNumRecords());
+        assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, 
recordSetWriter2.getNumBytes());
+        assertTrue(recordSetWriter1 != recordSetWriter2);
+
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testWriteTooLargeRecord() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(100000)
+                .maxSpeculativeTimeoutMs(200000)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(0)
+                .bufferSize(0)
+                .build();
+
+        byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10];
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        assertTrue(writeFuture.isDefined());
+        try {
+            Await.result(writeFuture);
+            fail("Should fail on writing too long record");
+        } catch (LogRecordTooLongException lrtle) {
+            // expected
+        }
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testSpeculativeWrite() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(0)
+                .bufferSize(0)
+                .build();
+
+        final String secondStream = writer.getStream(1);
+
+        final DLSN dlsn = new DLSN(99L, 88L, 0L);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                Object[] arguments = invocation.getArguments();
+                String stream = (String) arguments[0];
+                if (stream.equals(secondStream)) {
+                    return Future.value(dlsn);
+                } else {
+                    return new Promise<DLSN>();
+                }
+            }
+        }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) 
any());
+
+        byte[] data = "test-test".getBytes(UTF_8);
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        DLSN writeDLSN = Await.result(writeFuture);
+        assertEquals(dlsn, writeDLSN);
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testPeriodicalFlush() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(10)
+                .bufferSize(Integer.MAX_VALUE)
+                .build();
+
+        final DLSN dlsn = new DLSN(99L, 88L, 0L);
+
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                return Future.value(dlsn);
+            }
+        }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) 
any());
+
+        byte[] data = "test-test".getBytes(UTF_8);
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        DLSN writeDLSN = Await.result(writeFuture);
+        assertEquals(dlsn, writeDLSN);
+        writer.close();
+    }
+
+    @Test(timeout = 20000)
+    public void testFailRequestAfterRetriedAllStreams() throws Exception {
+        DistributedLogClient client = mock(DistributedLogClient.class);
+        when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
+                .thenReturn(new Promise<DLSN>());
+        DistributedLogMultiStreamWriter writer = 
DistributedLogMultiStreamWriter.newBuilder()
+                .streams(Lists.newArrayList("stream1", "stream2"))
+                .client(client)
+                .compressionCodec(CompressionCodec.Type.LZ4)
+                .firstSpeculativeTimeoutMs(10)
+                .maxSpeculativeTimeoutMs(20)
+                .speculativeBackoffMultiplier(2)
+                .requestTimeoutMs(5000000)
+                .flushIntervalMs(10)
+                .bufferSize(Integer.MAX_VALUE)
+                .build();
+
+        byte[] data = "test-test".getBytes(UTF_8);
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        Future<DLSN> writeFuture = writer.write(buffer);
+        try {
+            Await.result(writeFuture);
+            fail("Should fail the request after retries all streams");
+        } catch (IndividualRequestTimeoutException e) {
+            long timeoutMs = e.timeout().inMilliseconds();
+            assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000);
+        }
+        writer.close();
+    }
+}


Reply via email to