http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java new file mode 100644 index 0000000..f1da33c --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.speculative; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation of {@link SpeculativeRequestExecutionPolicy}. + */ +public class DefaultSpeculativeRequestExecutionPolicy implements SpeculativeRequestExecutionPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class); + final int firstSpeculativeRequestTimeout; + final int maxSpeculativeRequestTimeout; + final float backoffMultiplier; + int nextSpeculativeRequestTimeout; + + public DefaultSpeculativeRequestExecutionPolicy(int firstSpeculativeRequestTimeout, + int maxSpeculativeRequestTimeout, + float backoffMultiplier) { + this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout; + this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout; + this.backoffMultiplier = backoffMultiplier; + this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout; + + if (backoffMultiplier <= 0) { + throw new IllegalArgumentException("Invalid value provided for backoffMultiplier"); + } + + // Prevent potential over flow + if (Math.round((double) maxSpeculativeRequestTimeout * (double) backoffMultiplier) > Integer.MAX_VALUE) { + throw new IllegalArgumentException("Invalid values for maxSpeculativeRequestTimeout and backoffMultiplier"); + } + } + + @VisibleForTesting + int getNextSpeculativeRequestTimeout() { + return nextSpeculativeRequestTimeout; + } + + /** + * Initialize the speculative request execution policy. + * + * @param scheduler The scheduler service to issue the speculative request + * @param requestExecutor The executor is used to issue the actual speculative requests + */ + @Override + public void initiateSpeculativeRequest(final ScheduledExecutorService scheduler, + final SpeculativeRequestExecutor requestExecutor) { + issueSpeculativeRequest(scheduler, requestExecutor); + } + + private void issueSpeculativeRequest(final ScheduledExecutorService scheduler, + final SpeculativeRequestExecutor requestExecutor) { + Future<Boolean> issueNextRequest = requestExecutor.issueSpeculativeRequest(); + issueNextRequest.addEventListener(new FutureEventListener<Boolean>() { + // we want this handler to run immediately after we push the big red button! + @Override + public void onSuccess(Boolean issueNextRequest) { + if (issueNextRequest) { + scheduleSpeculativeRequest(scheduler, requestExecutor, nextSpeculativeRequestTimeout); + nextSpeculativeRequestTimeout = Math.min(maxSpeculativeRequestTimeout, + (int) (nextSpeculativeRequestTimeout * backoffMultiplier)); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Stopped issuing speculative requests for {}, " + + "speculativeReadTimeout = {}", requestExecutor, nextSpeculativeRequestTimeout); + } + } + } + + @Override + public void onFailure(Throwable thrown) { + LOG.warn("Failed to issue speculative request for {}, speculativeReadTimeout = {} : ", + new Object[] { requestExecutor, nextSpeculativeRequestTimeout, thrown }); + } + }); + } + + private void scheduleSpeculativeRequest(final ScheduledExecutorService scheduler, + final SpeculativeRequestExecutor requestExecutor, + final int speculativeRequestTimeout) { + try { + scheduler.schedule(new Runnable() { + @Override + public void run() { + issueSpeculativeRequest(scheduler, requestExecutor); + } + }, speculativeRequestTimeout, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException re) { + if (!scheduler.isShutdown()) { + LOG.warn("Failed to schedule speculative request for {}, speculativeReadTimeout = {} : ", + new Object[]{requestExecutor, speculativeRequestTimeout, re}); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java new file mode 100644 index 0000000..faf45c2 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.speculative; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * Speculative request execution policy. + */ +public interface SpeculativeRequestExecutionPolicy { + /** + * Initialize the speculative request execution policy and initiate requests. + * + * @param scheduler The scheduler service to issue the speculative request + * @param requestExecutor The executor is used to issue the actual speculative requests + */ + void initiateSpeculativeRequest(ScheduledExecutorService scheduler, + SpeculativeRequestExecutor requestExecutor); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java new file mode 100644 index 0000000..68fe8b0 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.speculative; + +import com.twitter.util.Future; + +/** + * Executor to execute speculative requests. + */ +public interface SpeculativeRequestExecutor { + + /** + * Issues a speculative request and indicates if more speculative requests should be issued. + * + * @return whether more speculative requests should be issued. + */ + Future<Boolean> issueSpeculativeRequest(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java new file mode 100644 index 0000000..4bdd4b1 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Speculative Mechanism. + */ +package org.apache.distributedlog.client.speculative; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java new file mode 100644 index 0000000..c2dcddd --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.stats; + +import org.apache.distributedlog.client.resolver.RegionResolver; +import org.apache.distributedlog.thrift.service.StatusCode; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Client Stats. + */ +public class ClientStats { + + // Region Resolver + private final RegionResolver regionResolver; + + // Stats + private final StatsReceiver statsReceiver; + private final ClientStatsLogger clientStatsLogger; + private final boolean enableRegionStats; + private final ConcurrentMap<String, ClientStatsLogger> regionClientStatsLoggers; + private final ConcurrentMap<String, OpStats> opStatsMap; + + public ClientStats(StatsReceiver statsReceiver, + boolean enableRegionStats, + RegionResolver regionResolver) { + this.statsReceiver = statsReceiver; + this.clientStatsLogger = new ClientStatsLogger(statsReceiver); + this.enableRegionStats = enableRegionStats; + this.regionClientStatsLoggers = new ConcurrentHashMap<String, ClientStatsLogger>(); + this.regionResolver = regionResolver; + this.opStatsMap = new ConcurrentHashMap<String, OpStats>(); + } + + public OpStats getOpStats(String op) { + OpStats opStats = opStatsMap.get(op); + if (null != opStats) { + return opStats; + } + OpStats newStats = new OpStats(statsReceiver.scope(op), + enableRegionStats, regionResolver); + OpStats oldStats = opStatsMap.putIfAbsent(op, newStats); + if (null == oldStats) { + return newStats; + } else { + return oldStats; + } + } + + private ClientStatsLogger getRegionClientStatsLogger(SocketAddress address) { + String region = regionResolver.resolveRegion(address); + return getRegionClientStatsLogger(region); + } + + private ClientStatsLogger getRegionClientStatsLogger(String region) { + ClientStatsLogger statsLogger = regionClientStatsLoggers.get(region); + if (null == statsLogger) { + ClientStatsLogger newStatsLogger = new ClientStatsLogger(statsReceiver.scope(region)); + ClientStatsLogger oldStatsLogger = regionClientStatsLoggers.putIfAbsent(region, newStatsLogger); + if (null == oldStatsLogger) { + statsLogger = newStatsLogger; + } else { + statsLogger = oldStatsLogger; + } + } + return statsLogger; + } + + public StatsReceiver getFinagleStatsReceiver(SocketAddress addr) { + if (enableRegionStats && null != addr) { + return getRegionClientStatsLogger(addr).getStatsReceiver(); + } else { + return clientStatsLogger.getStatsReceiver(); + } + } + + public void completeProxyRequest(SocketAddress addr, StatusCode code, long startTimeNanos) { + clientStatsLogger.completeProxyRequest(code, startTimeNanos); + if (enableRegionStats && null != addr) { + getRegionClientStatsLogger(addr).completeProxyRequest(code, startTimeNanos); + } + } + + public void failProxyRequest(SocketAddress addr, Throwable cause, long startTimeNanos) { + clientStatsLogger.failProxyRequest(cause, startTimeNanos); + if (enableRegionStats && null != addr) { + getRegionClientStatsLogger(addr).failProxyRequest(cause, startTimeNanos); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java new file mode 100644 index 0000000..530c632 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.stats; + +import org.apache.distributedlog.thrift.service.StatusCode; +import com.twitter.finagle.stats.Counter; +import com.twitter.finagle.stats.Stat; +import com.twitter.finagle.stats.StatsReceiver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** + * Stats Logger to collect client stats. + */ +public class ClientStatsLogger { + + // Stats + private final StatsReceiver statsReceiver; + private final StatsReceiver responseStatsReceiver; + private final ConcurrentMap<StatusCode, Counter> responseStats = + new ConcurrentHashMap<StatusCode, Counter>(); + private final StatsReceiver exceptionStatsReceiver; + private final ConcurrentMap<Class<?>, Counter> exceptionStats = + new ConcurrentHashMap<Class<?>, Counter>(); + + private final Stat proxySuccessLatencyStat; + private final Stat proxyFailureLatencyStat; + + public ClientStatsLogger(StatsReceiver statsReceiver) { + this.statsReceiver = statsReceiver; + responseStatsReceiver = statsReceiver.scope("responses"); + exceptionStatsReceiver = statsReceiver.scope("exceptions"); + StatsReceiver proxyLatencyStatReceiver = statsReceiver.scope("proxy_request_latency"); + proxySuccessLatencyStat = proxyLatencyStatReceiver.stat0("success"); + proxyFailureLatencyStat = proxyLatencyStatReceiver.stat0("failure"); + } + + public StatsReceiver getStatsReceiver() { + return statsReceiver; + } + + private Counter getResponseCounter(StatusCode code) { + Counter counter = responseStats.get(code); + if (null == counter) { + Counter newCounter = responseStatsReceiver.counter0(code.name()); + Counter oldCounter = responseStats.putIfAbsent(code, newCounter); + counter = null != oldCounter ? oldCounter : newCounter; + } + return counter; + } + + private Counter getExceptionCounter(Class<?> cls) { + Counter counter = exceptionStats.get(cls); + if (null == counter) { + Counter newCounter = exceptionStatsReceiver.counter0(cls.getName()); + Counter oldCounter = exceptionStats.putIfAbsent(cls, newCounter); + counter = null != oldCounter ? oldCounter : newCounter; + } + return counter; + } + + public void completeProxyRequest(StatusCode code, long startTimeNanos) { + getResponseCounter(code).incr(); + proxySuccessLatencyStat.add(elapsedMicroSec(startTimeNanos)); + } + + public void failProxyRequest(Throwable cause, long startTimeNanos) { + getExceptionCounter(cause.getClass()).incr(); + proxyFailureLatencyStat.add(elapsedMicroSec(startTimeNanos)); + } + + static long elapsedMicroSec(long startNanoTime) { + return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java new file mode 100644 index 0000000..7a49faa --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.stats; + +import org.apache.distributedlog.client.resolver.RegionResolver; +import com.twitter.finagle.stats.StatsReceiver; +import java.net.SocketAddress; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Op Stats. + */ +public class OpStats { + + // Region Resolver + private final RegionResolver regionResolver; + + // Stats + private final StatsReceiver statsReceiver; + private final OpStatsLogger opStatsLogger; + private final boolean enableRegionStats; + private final ConcurrentMap<String, OpStatsLogger> regionOpStatsLoggers; + + public OpStats(StatsReceiver statsReceiver, + boolean enableRegionStats, + RegionResolver regionResolver) { + this.statsReceiver = statsReceiver; + this.opStatsLogger = new OpStatsLogger(statsReceiver); + this.enableRegionStats = enableRegionStats; + this.regionOpStatsLoggers = new ConcurrentHashMap<String, OpStatsLogger>(); + this.regionResolver = regionResolver; + } + + private OpStatsLogger getRegionOpStatsLogger(SocketAddress address) { + String region = regionResolver.resolveRegion(address); + return getRegionOpStatsLogger(region); + } + + private OpStatsLogger getRegionOpStatsLogger(String region) { + OpStatsLogger statsLogger = regionOpStatsLoggers.get(region); + if (null == statsLogger) { + OpStatsLogger newStatsLogger = new OpStatsLogger(statsReceiver.scope(region)); + OpStatsLogger oldStatsLogger = regionOpStatsLoggers.putIfAbsent(region, newStatsLogger); + if (null == oldStatsLogger) { + statsLogger = newStatsLogger; + } else { + statsLogger = oldStatsLogger; + } + } + return statsLogger; + } + + public void completeRequest(SocketAddress addr, long micros, int numTries) { + opStatsLogger.completeRequest(micros, numTries); + if (enableRegionStats && null != addr) { + getRegionOpStatsLogger(addr).completeRequest(micros, numTries); + } + } + + public void failRequest(SocketAddress addr, long micros, int numTries) { + opStatsLogger.failRequest(micros, numTries); + if (enableRegionStats && null != addr) { + getRegionOpStatsLogger(addr).failRequest(micros, numTries); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java new file mode 100644 index 0000000..b94b4be --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.stats; + +import com.twitter.finagle.stats.Stat; +import com.twitter.finagle.stats.StatsReceiver; + +/** + * Stats Logger per operation type. + */ +public class OpStatsLogger { + + private final Stat successLatencyStat; + private final Stat failureLatencyStat; + private final Stat redirectStat; + + public OpStatsLogger(StatsReceiver statsReceiver) { + StatsReceiver latencyStatReceiver = statsReceiver.scope("latency"); + successLatencyStat = latencyStatReceiver.stat0("success"); + failureLatencyStat = latencyStatReceiver.stat0("failure"); + StatsReceiver redirectStatReceiver = statsReceiver.scope("redirects"); + redirectStat = redirectStatReceiver.stat0("times"); + } + + public void completeRequest(long micros, int numTries) { + successLatencyStat.add(micros); + redirectStat.add(numTries); + } + + public void failRequest(long micros, int numTries) { + failureLatencyStat.add(micros); + redirectStat.add(numTries); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java new file mode 100644 index 0000000..110e99a --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.client.stats; + +import com.twitter.finagle.stats.Counter; +import com.twitter.finagle.stats.StatsReceiver; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Stats Logger for ownerships. + */ +public class OwnershipStatsLogger { + + /** + * Ownership related stats. + */ + public static class OwnershipStat { + private final Counter hits; + private final Counter misses; + private final Counter removes; + private final Counter redirects; + private final Counter adds; + + OwnershipStat(StatsReceiver ownershipStats) { + hits = ownershipStats.counter0("hits"); + misses = ownershipStats.counter0("misses"); + adds = ownershipStats.counter0("adds"); + removes = ownershipStats.counter0("removes"); + redirects = ownershipStats.counter0("redirects"); + } + + public void onHit() { + hits.incr(); + } + + public void onMiss() { + misses.incr(); + } + + public void onAdd() { + adds.incr(); + } + + public void onRemove() { + removes.incr(); + } + + public void onRedirect() { + redirects.incr(); + } + + } + + private final OwnershipStat ownershipStat; + private final StatsReceiver ownershipStatsReceiver; + private final ConcurrentMap<String, OwnershipStat> ownershipStats = + new ConcurrentHashMap<String, OwnershipStat>(); + + public OwnershipStatsLogger(StatsReceiver statsReceiver, + StatsReceiver streamStatsReceiver) { + this.ownershipStat = new OwnershipStat(statsReceiver.scope("ownership")); + this.ownershipStatsReceiver = streamStatsReceiver.scope("perstream_ownership"); + } + + private OwnershipStat getOwnershipStat(String stream) { + OwnershipStat stat = ownershipStats.get(stream); + if (null == stat) { + OwnershipStat newStat = new OwnershipStat(ownershipStatsReceiver.scope(stream)); + OwnershipStat oldStat = ownershipStats.putIfAbsent(stream, newStat); + stat = null != oldStat ? oldStat : newStat; + } + return stat; + } + + public void onMiss(String stream) { + ownershipStat.onMiss(); + getOwnershipStat(stream).onMiss(); + } + + public void onHit(String stream) { + ownershipStat.onHit(); + getOwnershipStat(stream).onHit(); + } + + public void onRedirect(String stream) { + ownershipStat.onRedirect(); + getOwnershipStat(stream).onRedirect(); + } + + public void onRemove(String stream) { + ownershipStat.onRemove(); + getOwnershipStat(stream).onRemove(); + } + + public void onAdd(String stream) { + ownershipStat.onAdd(); + getOwnershipStat(stream).onAdd(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java new file mode 100644 index 0000000..106d3fc --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Client side stats utils. + */ +package org.apache.distributedlog.client.stats; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java new file mode 100644 index 0000000..68e6825 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +/** + * Socket Address identifier for a DL proxy. + */ +public class DLSocketAddress { + + private static final int VERSION = 1; + + private static final String COLON = ":"; + private static final String SEP = ";"; + + private final int shard; + private final InetSocketAddress socketAddress; + + public DLSocketAddress(int shard, InetSocketAddress socketAddress) { + this.shard = shard; + this.socketAddress = socketAddress; + } + + /** + * Shard id for dl write proxy. + * + * @return shard id for dl write proxy. + */ + public int getShard() { + return shard; + } + + /** + * Socket address for dl write proxy. + * + * @return socket address for dl write proxy + */ + public InetSocketAddress getSocketAddress() { + return socketAddress; + } + + /** + * Serialize the write proxy identifier to string. + * + * @return serialized write proxy identifier. + */ + public String serialize() { + return toLockId(socketAddress, shard); + } + + @Override + public int hashCode() { + return socketAddress.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof DLSocketAddress)) { + return false; + } + DLSocketAddress other = (DLSocketAddress) obj; + return shard == other.shard && socketAddress.equals(other.socketAddress); + } + + @Override + public String toString() { + return toLockId(socketAddress, shard); + } + + /** + * Deserialize proxy address from a string representation. + * + * @param lockId + * string representation of the proxy address. + * @return proxy address. + * @throws IOException + */ + public static DLSocketAddress deserialize(String lockId) throws IOException { + String parts[] = lockId.split(SEP); + if (3 != parts.length) { + throw new IOException("Invalid dl socket address " + lockId); + } + int version; + try { + version = Integer.parseInt(parts[0]); + } catch (NumberFormatException nfe) { + throw new IOException("Invalid version found in " + lockId, nfe); + } + if (VERSION != version) { + throw new IOException("Invalid version " + version + " found in " + lockId + ", expected " + VERSION); + } + int shardId; + try { + shardId = Integer.parseInt(parts[1]); + } catch (NumberFormatException nfe) { + throw new IOException("Invalid shard id found in " + lockId, nfe); + } + InetSocketAddress address = parseSocketAddress(parts[2]); + return new DLSocketAddress(shardId, address); + } + + /** + * Parse the inet socket address from the string representation. + * + * @param addr + * string representation + * @return inet socket address + */ + public static InetSocketAddress parseSocketAddress(String addr) { + String[] parts = addr.split(COLON); + checkArgument(parts.length == 2); + String hostname = parts[0]; + int port = Integer.parseInt(parts[1]); + return new InetSocketAddress(hostname, port); + } + + public static InetSocketAddress getSocketAddress(int port) throws UnknownHostException { + return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port); + } + + /** + * Convert inet socket address to the string representation. + * + * @param address + * inet socket address. + * @return string representation of inet socket address. + */ + public static String toString(InetSocketAddress address) { + StringBuilder sb = new StringBuilder(); + sb.append(address.getHostName()).append(COLON).append(address.getPort()); + return sb.toString(); + } + + public static String toLockId(InetSocketAddress address, int shard) { + StringBuilder sb = new StringBuilder(); + sb.append(VERSION).append(SEP).append(shard).append(SEP).append(toString(address)); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java new file mode 100644 index 0000000..9f30815 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecordSetBuffer; +import com.twitter.util.Future; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Interface for distributedlog client. + */ +public interface DistributedLogClient { + /** + * Write <i>data</i> to a given <i>stream</i>. + * + * @param stream + * Stream Name. + * @param data + * Data to write. + * @return a future representing a sequence id returned for this write. + */ + Future<DLSN> write(String stream, ByteBuffer data); + + /** + * Write record set to a given <i>stream</i>. + * + * <p>The record set is built from {@link org.apache.distributedlog.LogRecordSet.Writer} + * + * @param stream stream to write to + * @param recordSet record set + */ + Future<DLSN> writeRecordSet(String stream, LogRecordSetBuffer recordSet); + + /** + * Write <i>data</i> in bulk to a given <i>stream</i>. + * + * <p>Return a list of Future dlsns, one for each submitted buffer. In the event of a partial + * failure--ex. some specific buffer write fails, all subsequent writes + * will also fail. + * + * @param stream + * Stream Name. + * @param data + * Data to write. + * @return a list of futures, one for each submitted buffer. + */ + List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data); + + /** + * Truncate the stream to a given <i>dlsn</i>. + * + * @param stream + * Stream Name. + * @param dlsn + * DLSN to truncate until. + * @return a future representing the truncation. + */ + Future<Boolean> truncate(String stream, DLSN dlsn); + + /** + * Release the ownership of a stream <i>stream</i>. + * + * @param stream + * Stream Name to release. + * @return a future representing the release operation. + */ + Future<Void> release(String stream); + + /** + * Delete a given stream <i>stream</i>. + * + * @param stream + * Stream Name to delete. + * @return a future representing the delete operation. + */ + Future<Void> delete(String stream); + + /** + * Create a stream with name <i>stream</i>. + * + * @param stream + * Stream Name to create. + * @return a future representing the create operation. + */ + Future<Void> create(String stream); + + /** + * Close the client. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java new file mode 100644 index 0000000..0e2a152 --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java @@ -0,0 +1,608 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.client.ClientConfig; +import org.apache.distributedlog.client.DistributedLogClientImpl; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.client.proxy.ClusterClient; +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import org.apache.distributedlog.client.resolver.RegionResolver; +import org.apache.distributedlog.client.routing.RegionsRoutingService; +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.client.routing.RoutingUtils; +import org.apache.distributedlog.thrift.service.DistributedLogService; +import com.twitter.finagle.Name; +import com.twitter.finagle.Resolver$; +import com.twitter.finagle.Service; +import com.twitter.finagle.ThriftMux; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.stats.NullStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientId; +import com.twitter.finagle.thrift.ThriftClientFramedCodec; +import com.twitter.finagle.thrift.ThriftClientRequest; +import com.twitter.util.Duration; +import java.net.SocketAddress; +import java.net.URI; +import java.util.Random; +import org.apache.commons.lang.StringUtils; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +/** + * Builder to build {@link DistributedLogClient}. + */ +public final class DistributedLogClientBuilder { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class); + + private static final Random random = new Random(System.currentTimeMillis()); + + private String name = null; + private ClientId clientId = null; + private RoutingService.Builder routingServiceBuilder = null; + private ClientBuilder clientBuilder = null; + private String serverRoutingServiceFinagleName = null; + private StatsReceiver statsReceiver = new NullStatsReceiver(); + private StatsReceiver streamStatsReceiver = new NullStatsReceiver(); + private ClientConfig clientConfig = new ClientConfig(); + private boolean enableRegionStats = false; + private final RegionResolver regionResolver = new DefaultRegionResolver(); + + /** + * Create a client builder. + * + * @return client builder + */ + public static DistributedLogClientBuilder newBuilder() { + return new DistributedLogClientBuilder(); + } + + /** + * Create a new client builder from an existing {@code builder}. + * + * @param builder the existing builder. + * @return a new client builder. + */ + public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) { + DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder(); + newBuilder.name = builder.name; + newBuilder.clientId = builder.clientId; + newBuilder.clientBuilder = builder.clientBuilder; + newBuilder.routingServiceBuilder = builder.routingServiceBuilder; + newBuilder.statsReceiver = builder.statsReceiver; + newBuilder.streamStatsReceiver = builder.streamStatsReceiver; + newBuilder.enableRegionStats = builder.enableRegionStats; + newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName; + newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig); + return newBuilder; + } + + // private constructor + private DistributedLogClientBuilder() {} + + /** + * Client Name. + * + * @param name + * client name + * @return client builder. + */ + public DistributedLogClientBuilder name(String name) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.name = name; + return newBuilder; + } + + /** + * Client ID. + * + * @param clientId + * client id + * @return client builder. + */ + public DistributedLogClientBuilder clientId(ClientId clientId) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientId = clientId; + return newBuilder; + } + + /** + * Serverset to access proxy services. + * + * @param serverSet + * server set. + * @return client builder. + */ + public DistributedLogClientBuilder serverSet(ServerSet serverSet) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet); + newBuilder.enableRegionStats = false; + return newBuilder; + } + + /** + * Server Sets to access proxy services. + * + * <p>The <i>local</i> server set will be tried first then <i>remotes</i>. + * + * @param local local server set. + * @param remotes remote server sets. + * @return client builder. + */ + public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1]; + builders[0] = RoutingUtils.buildRoutingService(local); + for (int i = 1; i < builders.length; i++) { + builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]); + } + newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder() + .resolver(regionResolver) + .routingServiceBuilders(builders); + newBuilder.enableRegionStats = remotes.length > 0; + return newBuilder; + } + + /** + * Name to access proxy services. + * + * @param finagleNameStr + * finagle name string. + * @return client builder. + */ + public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); + newBuilder.enableRegionStats = false; + return newBuilder; + } + + /** + * Finagle name strs to access proxy services. + * + * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>. + * + * @param local local server set. + * @param remotes remote server sets. + * @return client builder. + */ + public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1]; + builders[0] = RoutingUtils.buildRoutingService(local); + for (int i = 1; i < builders.length; i++) { + builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]); + } + newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder() + .routingServiceBuilders(builders) + .resolver(regionResolver); + newBuilder.enableRegionStats = remotes.length > 0; + return newBuilder; + } + + /** + * URI to access proxy services. + * + * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri. + * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to + * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`) + * + * @param uri namespace uri to access the serverset of write proxies + * @return distributedlog builder + */ + public DistributedLogClientBuilder uri(URI uri) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + String zkServers = uri.getAuthority().replace(";", ","); + String[] zkServerList = StringUtils.split(zkServers, ','); + String finagleNameStr = String.format( + "zk!%s!%s/.write_proxy", + zkServerList[random.nextInt(zkServerList.length)], // zk server + uri.getPath()); + newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); + newBuilder.enableRegionStats = false; + return newBuilder; + } + + /** + * Address of write proxy to connect. + * + * @param address + * write proxy address. + * @return client builder. + */ + public DistributedLogClientBuilder host(SocketAddress address) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address); + newBuilder.enableRegionStats = false; + return newBuilder; + } + + private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.routingServiceBuilder = builder; + newBuilder.enableRegionStats = false; + return newBuilder; + } + + /** + * Routing Service to access proxy services. + * + * @param routingService + * routing service + * @return client builder. + */ + @VisibleForTesting + public DistributedLogClientBuilder routingService(RoutingService routingService) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService); + newBuilder.enableRegionStats = false; + return newBuilder; + } + + /** + * Stats receiver to expose client stats. + * + * @param statsReceiver + * stats receiver. + * @return client builder. + */ + public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.statsReceiver = statsReceiver; + return newBuilder; + } + + /** + * Stream Stats Receiver to expose per stream stats. + * + * @param streamStatsReceiver + * stream stats receiver + * @return client builder. + */ + public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.streamStatsReceiver = streamStatsReceiver; + return newBuilder; + } + + /** + * Set underlying finagle client builder. + * + * @param builder + * finagle client builder. + * @return client builder. + */ + public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientBuilder = builder; + return newBuilder; + } + + /** + * Backoff time when redirecting to an already retried host. + * + * @param ms + * backoff time. + * @return client builder. + */ + public DistributedLogClientBuilder redirectBackoffStartMs(int ms) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setRedirectBackoffStartMs(ms); + return newBuilder; + } + + /** + * Max backoff time when redirecting to an already retried host. + * + * @param ms + * backoff time. + * @return client builder. + */ + public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setRedirectBackoffMaxMs(ms); + return newBuilder; + } + + /** + * Max redirects that is allowed per request. + * + * <p>If <i>redirects</i> are exhausted, fail the request immediately. + * + * @param redirects + * max redirects allowed before failing a request. + * @return client builder. + */ + public DistributedLogClientBuilder maxRedirects(int redirects) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setMaxRedirects(redirects); + return newBuilder; + } + + /** + * Timeout per request in millis. + * + * @param timeoutMs + * timeout per request in millis. + * @return client builder. + */ + public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs); + return newBuilder; + } + + /** + * Set thriftmux enabled. + * + * @param enabled + * is thriftmux enabled + * @return client builder. + */ + public DistributedLogClientBuilder thriftmux(boolean enabled) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setThriftMux(enabled); + return newBuilder; + } + + /** + * Set failfast stream exception handling enabled. + * + * @param enabled + * is failfast exception handling enabled + * @return client builder. + */ + public DistributedLogClientBuilder streamFailfast(boolean enabled) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setStreamFailfast(enabled); + return newBuilder; + } + + /** + * Set the regex to match stream names that the client cares about. + * + * @param nameRegex + * stream name regex + * @return client builder + */ + public DistributedLogClientBuilder streamNameRegex(String nameRegex) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setStreamNameRegex(nameRegex); + return newBuilder; + } + + /** + * Whether to use the new handshake endpoint to exchange ownership cache. + * + * <p>Enable this when the servers are updated to support handshaking with client info. + * + * @param enabled + * new handshake endpoint is enabled. + * @return client builder. + */ + public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setHandshakeWithClientInfo(enabled); + return newBuilder; + } + + /** + * Set the periodic handshake interval in milliseconds. + * + * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again. + * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will. + * + * @see #periodicOwnershipSyncIntervalMs(long) + * @param intervalMs + * handshake interval + * @return client builder. + */ + public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs); + return newBuilder; + } + + /** + * Set the periodic ownership sync interval in milliseconds. + * + * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than + * sync interval. + * + * @see #periodicHandshakeIntervalMs(long) + * @param intervalMs + * interval that handshake should sync ownerships. + * @return client builder + */ + public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs); + return newBuilder; + } + + /** + * Enable/Disable periodic dumping ownership cache. + * + * @param enabled + * flag to enable/disable periodic dumping ownership cache + * @return client builder. + */ + public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled); + return newBuilder; + } + + /** + * Set periodic dumping ownership cache interval. + * + * @param intervalMs + * interval on dumping ownership cache, in millis. + * @return client builder + */ + public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs); + return newBuilder; + } + + /** + * Enable handshake tracing. + * + * @param enabled + * flag to enable/disable handshake tracing + * @return client builder + */ + public DistributedLogClientBuilder handshakeTracing(boolean enabled) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setHandshakeTracingEnabled(enabled); + return newBuilder; + } + + /** + * Enable checksum on requests to the proxy. + * + * @param enabled + * flag to enable/disable checksum + * @return client builder + */ + public DistributedLogClientBuilder checksum(boolean enabled) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig.setChecksumEnabled(enabled); + return newBuilder; + } + + /** + * Configure the finagle name string for the server-side routing service. + * + * @param nameStr name string of the server-side routing service + * @return client builder + */ + public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.serverRoutingServiceFinagleName = nameStr; + return newBuilder; + } + + DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.clientConfig = ClientConfig.newConfig(clientConfig); + return newBuilder; + } + + /** + * Build distributedlog client. + * + * @return distributedlog client. + */ + public DistributedLogClient build() { + return buildClient(); + } + + /** + * Build monitor service client. + * + * @return monitor service client. + */ + public MonitorServiceClient buildMonitorClient() { + + return buildClient(); + } + + @SuppressWarnings("unchecked") + ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) { + ClientBuilder builder = this.clientBuilder; + if (null == builder) { + builder = ClientBuilder.get() + .tcpConnectTimeout(Duration.fromMilliseconds(200)) + .connectTimeout(Duration.fromMilliseconds(200)) + .requestTimeout(Duration.fromSeconds(1)) + .retries(20); + if (!clientConfig.getThriftMux()) { + builder = builder.hostConnectionLimit(1); + } + } + if (clientConfig.getThriftMux()) { + builder = builder.stack(ThriftMux.client().withClientId(clientId)); + } else { + builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId))); + } + + Name name; + try { + name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName); + } catch (Exception exc) { + logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc); + throw new RuntimeException(exc); + } + + // builder the client + Service<ThriftClientRequest, byte[]> client = + ClientBuilder.safeBuildFactory( + builder.dest(name).reportTo(statsReceiver.scope("routing")) + ).toService(); + DistributedLogService.ServiceIface service = + new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory()); + return new ClusterClient(client, service); + } + + DistributedLogClientImpl buildClient() { + checkNotNull(name, "No name provided."); + checkNotNull(clientId, "No client id provided."); + checkNotNull(routingServiceBuilder, "No routing service builder provided."); + checkNotNull(statsReceiver, "No stats receiver provided."); + if (null == streamStatsReceiver) { + streamStatsReceiver = new NullStatsReceiver(); + } + + Optional<ClusterClient> serverRoutingServiceClient = Optional.absent(); + if (null != serverRoutingServiceFinagleName) { + serverRoutingServiceClient = Optional.of( + buildServerRoutingServiceClient(serverRoutingServiceFinagleName)); + } + + RoutingService routingService = routingServiceBuilder + .statsReceiver(statsReceiver.scope("routing")) + .build(); + DistributedLogClientImpl clientImpl = + new DistributedLogClientImpl( + name, + clientId, + routingService, + clientBuilder, + clientConfig, + serverRoutingServiceClient, + statsReceiver, + streamStatsReceiver, + regionResolver, + enableRegionStats); + routingService.startService(); + clientImpl.handshake(); + return clientImpl; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java new file mode 100644 index 0000000..033882f --- /dev/null +++ b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * DistributedLog Service Client. + */ +package org.apache.distributedlog.service; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/resources/findbugsExclude.xml b/distributedlog-client/src/main/resources/findbugsExclude.xml index 29e1a16..05ee085 100644 --- a/distributedlog-client/src/main/resources/findbugsExclude.xml +++ b/distributedlog-client/src/main/resources/findbugsExclude.xml @@ -18,6 +18,6 @@ <FindBugsFilter> <Match> <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~com\.twitter\.distributedlog\.thrift.*" /> + <Class name="~org\.apache\.distributedlog\.thrift.*" /> </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java deleted file mode 100644 index b302439..0000000 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java +++ /dev/null @@ -1,383 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.LogRecordSet; -import com.twitter.distributedlog.LogRecordSetBuffer; -import com.twitter.distributedlog.exceptions.LogRecordTooLongException; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.distributedlog.service.DistributedLogClient; -import com.twitter.finagle.IndividualRequestTimeoutException; -import com.twitter.util.Await; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import java.nio.ByteBuffer; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Test {@link DistributedLogMultiStreamWriter}. - */ -public class TestDistributedLogMultiStreamWriter { - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithNullStreams() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithEmptyStreamList() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.<String>newArrayList()) - .build(); - } - - @Test(timeout = 20000, expected = NullPointerException.class) - public void testBuildWithNullClient() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .build(); - } - - @Test(timeout = 20000, expected = NullPointerException.class) - public void testBuildWithNullCodec() throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(null) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings1() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(-1) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings2() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(5) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings3() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(-1) - .build(); - } - - @Test(timeout = 20000, expected = IllegalArgumentException.class) - public void testBuildWithInvalidSpeculativeSettings4() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(10) - .build(); - } - - @Test(timeout = 20000) - public void testBuildMultiStreamWriter() - throws Exception { - DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(50) - .build(); - assertTrue(true); - } - - @Test(timeout = 20000) - public void testBuildWithPeriodicalFlushEnabled() throws Exception { - ScheduledExecutorService executorService = mock(ScheduledExecutorService.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(50) - .flushIntervalMs(1000) - .scheduler(executorService) - .build(); - verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS); - } - - @Test(timeout = 20000) - public void testBuildWithPeriodicalFlushDisabled() throws Exception { - ScheduledExecutorService executorService = mock(ScheduledExecutorService.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(mock(DistributedLogClient.class)) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(50) - .flushIntervalMs(0) - .scheduler(executorService) - .build(); - verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS); - writer.close(); - } - - @Test(timeout = 20000) - public void testFlushWhenBufferIsFull() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any())) - .thenReturn(Future.value(new DLSN(1L, 1L, 999L))); - - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(100000) - .maxSpeculativeTimeoutMs(200000) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(500000) - .flushIntervalMs(0) - .bufferSize(0) - .scheduler(executorService) - .build(); - - ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8)); - writer.write(buffer); - - verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - - writer.close(); - } - - @Test(timeout = 20000) - public void testFlushWhenExceedMaxLogRecordSetSize() - throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any())) - .thenReturn(Future.value(new DLSN(1L, 1L, 999L))); - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(100000) - .maxSpeculativeTimeoutMs(200000) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(500000) - .flushIntervalMs(0) - .bufferSize(Integer.MAX_VALUE) - .scheduler(executorService) - .build(); - - byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100]; - ByteBuffer buffer1 = ByteBuffer.wrap(data); - writer.write(buffer1); - verify(client, times(0)).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter(); - assertEquals(1, recordSetWriter1.getNumRecords()); - assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter1.getNumBytes()); - - ByteBuffer buffer2 = ByteBuffer.wrap(data); - writer.write(buffer2); - verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter(); - assertEquals(1, recordSetWriter2.getNumRecords()); - assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter2.getNumBytes()); - assertTrue(recordSetWriter1 != recordSetWriter2); - - writer.close(); - } - - @Test(timeout = 20000) - public void testWriteTooLargeRecord() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(100000) - .maxSpeculativeTimeoutMs(200000) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(0) - .bufferSize(0) - .build(); - - byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10]; - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - assertTrue(writeFuture.isDefined()); - try { - Await.result(writeFuture); - fail("Should fail on writing too long record"); - } catch (LogRecordTooLongException lrtle) { - // expected - } - writer.close(); - } - - @Test(timeout = 20000) - public void testSpeculativeWrite() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(0) - .bufferSize(0) - .build(); - - final String secondStream = writer.getStream(1); - - final DLSN dlsn = new DLSN(99L, 88L, 0L); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - Object[] arguments = invocation.getArguments(); - String stream = (String) arguments[0]; - if (stream.equals(secondStream)) { - return Future.value(dlsn); - } else { - return new Promise<DLSN>(); - } - } - }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - - byte[] data = "test-test".getBytes(UTF_8); - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - DLSN writeDLSN = Await.result(writeFuture); - assertEquals(dlsn, writeDLSN); - writer.close(); - } - - @Test(timeout = 20000) - public void testPeriodicalFlush() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(10) - .bufferSize(Integer.MAX_VALUE) - .build(); - - final DLSN dlsn = new DLSN(99L, 88L, 0L); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - return Future.value(dlsn); - } - }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any()); - - byte[] data = "test-test".getBytes(UTF_8); - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - DLSN writeDLSN = Await.result(writeFuture); - assertEquals(dlsn, writeDLSN); - writer.close(); - } - - @Test(timeout = 20000) - public void testFailRequestAfterRetriedAllStreams() throws Exception { - DistributedLogClient client = mock(DistributedLogClient.class); - when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any())) - .thenReturn(new Promise<DLSN>()); - DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder() - .streams(Lists.newArrayList("stream1", "stream2")) - .client(client) - .compressionCodec(CompressionCodec.Type.LZ4) - .firstSpeculativeTimeoutMs(10) - .maxSpeculativeTimeoutMs(20) - .speculativeBackoffMultiplier(2) - .requestTimeoutMs(5000000) - .flushIntervalMs(10) - .bufferSize(Integer.MAX_VALUE) - .build(); - - byte[] data = "test-test".getBytes(UTF_8); - ByteBuffer buffer = ByteBuffer.wrap(data); - Future<DLSN> writeFuture = writer.write(buffer); - try { - Await.result(writeFuture); - fail("Should fail the request after retries all streams"); - } catch (IndividualRequestTimeoutException e) { - long timeoutMs = e.timeout().inMilliseconds(); - assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000); - } - writer.close(); - } -}