http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java deleted file mode 100644 index 4145040..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncFailureInjector.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.injector; - -/** - * Failure Injector that works in asynchronous way - */ -public interface AsyncFailureInjector { - - AsyncFailureInjector NULL = new AsyncFailureInjector() { - @Override - public void injectErrors(boolean enabled) { - // no-op - } - - @Override - public boolean shouldInjectErrors() { - return false; - } - - @Override - public void injectDelays(boolean enabled) { - // no-op - } - - @Override - public boolean shouldInjectDelays() { - return false; - } - - @Override - public int getInjectedDelayMs() { - return 0; - } - - @Override - public void injectStops(boolean enabled) { - // no-op - } - - @Override - public boolean shouldInjectStops() { - return false; - } - - @Override - public boolean shouldInjectCorruption(long startEntryId, long endEntryId) { - return false; - } - - @Override - public String toString() { - return "NULL"; - } - }; - - /** - * Enable or disable error injection. - * - * @param enabled - * flag to enable or disable error injection. - */ - void injectErrors(boolean enabled); - - /** - * Return the flag indicating if should inject errors. - * - * @return true to inject errors otherwise false. - */ - boolean shouldInjectErrors(); - - /** - * Enable or disable delay injection. - * - * @param enabled - * flag to enable or disable delay injection. - */ - void injectDelays(boolean enabled); - - /** - * Return the flag indicating if should inject delays. - * - * @return true to inject delays otherwise false. - */ - boolean shouldInjectDelays(); - - /** - * Return the injected delay in milliseconds. - * - * @return the injected delay in milliseconds. - */ - int getInjectedDelayMs(); - - /** - * Enable or disable injecting stops. This could be used - * for simulating stopping an action. - */ - void injectStops(boolean enabled); - - /** - * Return the flag indicating if should inject stops. - * - * @return true to inject stops otherwise false. - */ - boolean shouldInjectStops(); - - /** - * Return the flag indicating if should inject corruption. - * - * @param startEntryId the start entry id - * @param endEntryId the end entry id - * @return true to inject corruption otherwise false. - */ - boolean shouldInjectCorruption(long startEntryId, long endEntryId); - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java deleted file mode 100644 index f3bfea9..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/AsyncRandomFailureInjector.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.injector; - -import com.twitter.distributedlog.util.Utils; - -import java.util.Random; - -/** - * Failure injector based on {@link java.util.Random} - */ -public class AsyncRandomFailureInjector implements AsyncFailureInjector { - - private static final Random random = new Random(System.currentTimeMillis()); - - public static Builder newBuilder() { - return new Builder(); - } - - public static class Builder { - - private boolean _simulateDelays = false; - private boolean _simulateErrors = false; - private boolean _simulateStops = false; - private boolean _simulateCorruption = false; - private int _injectedDelayPercent = 0; - private int _injectedErrorPercent = 0; - private int _injectedStopPercent = 0; - private int _maxInjectedDelayMs = Integer.MAX_VALUE; - - private Builder() {} - - public Builder injectDelays(boolean simulateDelays, - int injectedDelayPercent, - int maxInjectedDelayMs) { - this._simulateDelays = simulateDelays; - this._injectedDelayPercent = injectedDelayPercent; - this._maxInjectedDelayMs = maxInjectedDelayMs; - return this; - } - - public Builder injectErrors(boolean simulateErrors, - int injectedErrorPercent) { - this._simulateErrors = simulateErrors; - this._injectedErrorPercent = injectedErrorPercent; - return this; - } - - public Builder injectCorruption(boolean simulateCorruption) { - this._simulateCorruption = simulateCorruption; - return this; - } - - public Builder injectStops(boolean simulateStops, - int injectedStopPercent) { - this._simulateStops = simulateStops; - this._injectedStopPercent = injectedStopPercent; - return this; - } - - public AsyncFailureInjector build() { - return new AsyncRandomFailureInjector( - _simulateDelays, - _injectedDelayPercent, - _maxInjectedDelayMs, - _simulateErrors, - _injectedErrorPercent, - _simulateStops, - _injectedStopPercent, - _simulateCorruption); - } - - } - - private boolean simulateDelays; - private boolean simulateErrors; - private boolean simulateStops; - private boolean simulateCorruption; - private final int injectedDelayPercent; - private final int injectedErrorPercent; - private final int injectedStopPercent; - private final int maxInjectedDelayMs; - - private AsyncRandomFailureInjector(boolean simulateDelays, - int injectedDelayPercent, - int maxInjectedDelayMs, - boolean simulateErrors, - int injectedErrorPercent, - boolean simulateStops, - int injectedStopPercent, - boolean simulateCorruption) { - this.simulateDelays = simulateDelays; - this.injectedDelayPercent = injectedDelayPercent; - this.maxInjectedDelayMs = maxInjectedDelayMs; - this.simulateErrors = simulateErrors; - this.injectedErrorPercent = injectedErrorPercent; - this.simulateStops = simulateStops; - this.injectedStopPercent = injectedStopPercent; - this.simulateCorruption = simulateCorruption; - } - - @Override - public void injectErrors(boolean enabled) { - this.simulateErrors = enabled; - } - - @Override - public boolean shouldInjectErrors() { - return simulateErrors && Utils.randomPercent(injectedErrorPercent); - } - - @Override - public void injectDelays(boolean enabled) { - this.simulateDelays = enabled; - } - - @Override - public boolean shouldInjectDelays() { - return simulateDelays && Utils.randomPercent(injectedDelayPercent); - } - - @Override - public int getInjectedDelayMs() { - if (maxInjectedDelayMs > 0) { - return random.nextInt(maxInjectedDelayMs); - } - return 0; - } - - @Override - public void injectStops(boolean enabled) { - this.simulateStops = enabled; - } - - @Override - public boolean shouldInjectStops() { - return simulateStops && Utils.randomPercent(injectedStopPercent); - } - - @Override - public boolean shouldInjectCorruption(long startEntryId, long endEntryId) { - if (!simulateCorruption) { - return false; - } - if (startEntryId == endEntryId) { - return startEntryId % 10 == 0; - } - for (long i = startEntryId; i <= endEntryId; i++) { - if (i % 10 == 0) { - return true; - } - } - return false; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("FailureInjector["); - sb.append("errors=(").append(simulateErrors).append(", pct=") - .append(injectedErrorPercent).append("), "); - sb.append("delays=(").append(simulateDelays).append(", pct=") - .append(injectedDelayPercent).append(", max=") - .append(maxInjectedDelayMs).append("), "); - sb.append("stops=(").append(simulateStops).append(", pct=") - .append(injectedStopPercent).append(")"); - sb.append("corruption=(").append(simulateCorruption).append(")"); - sb.append("]"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/FailureInjector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/FailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/FailureInjector.java deleted file mode 100644 index 16c8e4e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/FailureInjector.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.injector; - -/** - * Failure injector. - */ -public interface FailureInjector { - - /** - * No-op failure injector, which does nothing. - */ - public static FailureInjector NULL = new FailureInjector() { - @Override - public void inject() { - // no-op; - } - }; - - // inject failures - void inject(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/RandomDelayFailureInjector.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/RandomDelayFailureInjector.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/RandomDelayFailureInjector.java deleted file mode 100644 index 73aad5b..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/RandomDelayFailureInjector.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.injector; - -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.util.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Injector that injects random delays - */ -public class RandomDelayFailureInjector implements FailureInjector { - - private static final Logger LOG = LoggerFactory.getLogger(RandomDelayFailureInjector.class); - - private final DynamicDistributedLogConfiguration dynConf; - - public RandomDelayFailureInjector(DynamicDistributedLogConfiguration dynConf) { - this.dynConf = dynConf; - } - - private int delayMs() { - return dynConf.getEIInjectedWriteDelayMs(); - } - - private double delayPct() { - return dynConf.getEIInjectedWriteDelayPercent(); - } - - private boolean enabled() { - return delayMs() > 0 && delayPct() > 0; - } - - @Override - public void inject() { - try { - if (enabled() && Utils.randomPercent(delayPct())) { - Thread.sleep(delayMs()); - } - } catch (InterruptedException ex) { - LOG.warn("delay was interrupted ", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/package-info.java deleted file mode 100644 index ffee340..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/injector/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Failure Injection - */ -package com.twitter.distributedlog.injector; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortable.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortable.java deleted file mode 100644 index 0d0b389..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortable.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.io; - -import java.io.IOException; - -/** - * An {@code Abortable} is a source or destination of data that can be aborted. - * The abort method is invoked to release resources that the object is holding - * (such as open files). The abort happens when the object is in an error state, - * which it couldn't be closed gracefully. - * - * @see java.io.Closeable - * @since 0.3.32 - */ -public interface Abortable { - - /** - * Aborts the object and releases any resources associated with it. - * If the object is already aborted then invoking this method has no - * effect. - * - * @throws IOException if an I/O error occurs. - */ - public void abort() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortables.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortables.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortables.java deleted file mode 100644 index 4599574..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/Abortables.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.io; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.function.VoidFunctions; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Future; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; - -/** - * Utility methods for working with {@link Abortable} objects. - * - * @since 0.3.32 - */ -public final class Abortables { - - static final Logger logger = LoggerFactory.getLogger(Abortables.class); - - private Abortables() {} - - public static Future<Void> asyncAbort(@Nullable AsyncAbortable abortable, - boolean swallowIOException) { - if (null == abortable) { - return Future.Void(); - } else if (swallowIOException) { - return FutureUtils.ignore(abortable.asyncAbort()); - } else { - return abortable.asyncAbort(); - } - } - - /** - * Aborts a {@link Abortable}, with control over whether an {@link IOException} may be thrown. - * This is primarily useful in a finally block, where a thrown exception needs to be logged but - * not propagated (otherwise the original exception will be lost). - * - * <p>If {@code swallowIOException} is true then we never throw {@code IOException} but merely log it. - * - * <p>Example: <pre> {@code - * - * public void abortStreamNicely() throws IOException { - * SomeStream stream = new SomeStream("foo"); - * try { - * // ... code which does something with the stream ... - * } catch (IOException ioe) { - * // If an exception occurs, we might abort the stream. - * Abortables.abort(stream, true); - * } - * }}</pre> - * - * @param abortable the {@code Abortable} object to be aborted, or null, in which case this method - * does nothing. - * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods - * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} - */ - public static void abort(@Nullable Abortable abortable, - boolean swallowIOException) - throws IOException { - if (null == abortable) { - return; - } - try { - abortable.abort(); - } catch (IOException ioe) { - if (swallowIOException) { - logger.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe); - } else { - throw ioe; - } - } - } - - /** - * Abort async <i>abortable</i> - * - * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method - * does nothing. - * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods - * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} - * @see #abort(Abortable, boolean) - */ - public static void abort(@Nullable AsyncAbortable abortable, - boolean swallowIOException) - throws IOException { - if (null == abortable) { - return; - } - try { - FutureUtils.result(abortable.asyncAbort()); - } catch (IOException ioe) { - if (swallowIOException) { - logger.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe); - } else { - throw ioe; - } - } - } - - /** - * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than - * propagating it. - * - * While it's not safe in the general case to ignore exceptions that are thrown when aborting an - * I/O resource, it should generally be safe in the case of a resource that's being used only for - * reading. - * - * @param abortable the {@code Abortable} to be closed, or {@code null} in which case this method - * does nothing. - */ - public static void abortQuietly(@Nullable Abortable abortable) { - try { - abort(abortable, true); - } catch (IOException e) { - logger.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e); - } - } - - /** - * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than - * propagating it. - * - * While it's not safe in the general case to ignore exceptions that are thrown when aborting an - * I/O resource, it should generally be safe in the case of a resource that's being used only for - * reading. - * - * @param abortable the {@code AsyncAbortable} to be closed, or {@code null} in which case this method - * does nothing. - */ - public static void abortQuietly(@Nullable AsyncAbortable abortable) { - try { - abort(abortable, true); - } catch (IOException e) { - logger.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e); - } - } - - /** - * Abort the abortables in sequence. - * - * @param executorService - * executor service to execute - * @param abortables - * abortables to abort - * @return future represents the abort future - */ - public static Future<Void> abortSequence(ExecutorService executorService, - AsyncAbortable... abortables) { - List<AsyncAbortable> abortableList = Lists.newArrayListWithExpectedSize(abortables.length); - for (AsyncAbortable abortable : abortables) { - if (null == abortable) { - abortableList.add(AsyncAbortable.NULL); - } else { - abortableList.add(abortable); - } - } - return FutureUtils.processList( - abortableList, - AsyncAbortable.ABORT_FUNC, - executorService).map(VoidFunctions.LIST_TO_VOID_FUNC); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncAbortable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncAbortable.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncAbortable.java deleted file mode 100644 index ed1062a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncAbortable.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.io; - -import com.twitter.util.Function; -import com.twitter.util.Future; - -/** - * An {@code Abortable} is a source or destination of data that can be aborted. - * The abort method is invoked to release resources that the object is holding - * (such as open files). The abort happens when the object is in an error state, - * which it couldn't be closed gracefully. - * - * @see AsyncCloseable - * @see Abortable - * @since 0.3.43 - */ -public interface AsyncAbortable { - - Function<AsyncAbortable, Future<Void>> ABORT_FUNC = new Function<AsyncAbortable, Future<Void>>() { - @Override - public Future<Void> apply(AsyncAbortable abortable) { - return abortable.asyncAbort(); - } - }; - - AsyncAbortable NULL = new AsyncAbortable() { - @Override - public Future<Void> asyncAbort() { - return Future.Void(); - } - }; - - /** - * Aborts the object and releases any resources associated with it. - * If the object is already aborted then invoking this method has no - * effect. - * - * @return future represents the abort result - */ - Future<Void> asyncAbort(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncCloseable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncCloseable.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncCloseable.java deleted file mode 100644 index 817a8e2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncCloseable.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.io; - -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.util.Function; -import com.twitter.util.Future; - -/** - * A {@code AsyncCloseable} is a source or destination of data that can be closed asynchronously. - * The close method is invoked to release resources that the object is - * holding (such as open files). - */ -public interface AsyncCloseable { - - Function<AsyncCloseable, Future<Void>> CLOSE_FUNC = new Function<AsyncCloseable, Future<Void>>() { - @Override - public Future<Void> apply(AsyncCloseable closeable) { - return closeable.asyncClose(); - } - }; - - Function<AsyncCloseable, Future<Void>> CLOSE_FUNC_IGNORE_ERRORS = new Function<AsyncCloseable, Future<Void>>() { - @Override - public Future<Void> apply(AsyncCloseable closeable) { - return FutureUtils.ignore(closeable.asyncClose()); - } - }; - - AsyncCloseable NULL = new AsyncCloseable() { - @Override - public Future<Void> asyncClose() { - return Future.Void(); - } - }; - - /** - * Closes this source and releases any system resources associated - * with it. If the source is already closed then invoking this - * method has no effect. - * - * @return future representing the close result. - */ - Future<Void> asyncClose(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncDeleteable.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncDeleteable.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncDeleteable.java deleted file mode 100644 index 203895e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/AsyncDeleteable.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.io; - -import com.twitter.util.Future; - -/** - * A {@code AsyncDeleteable} is a source or destination of data that can be deleted asynchronously. - * This delete method is invoked to delete the source. - */ -public interface AsyncDeleteable { - /** - * Releases any system resources associated with this and delete the source. If the source is - * already deleted then invoking this method has no effect. - * - * @return future representing the deletion result. - */ - Future<Void> delete(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/io/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/io/package-info.java deleted file mode 100644 index df2e91f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/io/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * IO Utils for distributedlog - */ -package com.twitter.distributedlog.io; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ChainedRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ChainedRequestLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ChainedRequestLimiter.java deleted file mode 100644 index 60eacd5..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ChainedRequestLimiter.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.limiter; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.twitter.distributedlog.exceptions.OverCapacityException; - -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Chain request limiters for easier management of multi limiter policy. - */ -public class ChainedRequestLimiter<Request> implements RequestLimiter<Request> { - private final ImmutableList<RequestLimiter<Request>> limiters; - private final OpStatsLogger applyTime; - - public static class Builder<Request> { - private final ImmutableList.Builder<RequestLimiter<Request>> limitersBuilder; - private StatsLogger statsLogger = NullStatsLogger.INSTANCE; - - public Builder() { - this.limitersBuilder = new ImmutableList.Builder<RequestLimiter<Request>>(); - } - - public Builder<Request> addLimiter(RequestLimiter<Request> limiter) { - this.limitersBuilder.add(limiter); - return this; - } - - public Builder<Request> statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - public ChainedRequestLimiter<Request> build() { - return new ChainedRequestLimiter<Request>(limitersBuilder.build(), statsLogger); - } - } - - private ChainedRequestLimiter(ImmutableList<RequestLimiter<Request>> limiters, - StatsLogger statsLogger) { - this.limiters = limiters; - this.applyTime = statsLogger.getOpStatsLogger("apply"); - } - - public void apply(Request request) throws OverCapacityException { - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - for (RequestLimiter<Request> limiter : limiters) { - limiter.apply(request); - } - } finally { - applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ComposableRequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ComposableRequestLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ComposableRequestLimiter.java deleted file mode 100644 index 55e4c8b..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/ComposableRequestLimiter.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.limiter; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import com.twitter.distributedlog.exceptions.OverCapacityException; -import com.twitter.distributedlog.limiter.GuavaRateLimiter; -import com.twitter.distributedlog.limiter.RateLimiter; - -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Collect rate limiter implementation, cost(Request), overlimit, etc. behavior. - */ -public class ComposableRequestLimiter<Request> implements RequestLimiter<Request> { - protected static final Logger LOG = LoggerFactory.getLogger(ComposableRequestLimiter.class); - - private final RateLimiter limiter; - private final OverlimitFunction<Request> overlimitFunction; - private final CostFunction<Request> costFunction; - private final Counter overlimitCounter; - - static public interface OverlimitFunction<Request> { - void apply(Request request) throws OverCapacityException; - } - static public interface CostFunction<Request> { - int apply(Request request); - } - - public ComposableRequestLimiter( - RateLimiter limiter, - OverlimitFunction<Request> overlimitFunction, - CostFunction<Request> costFunction, - StatsLogger statsLogger) { - Preconditions.checkNotNull(limiter); - Preconditions.checkNotNull(overlimitFunction); - Preconditions.checkNotNull(costFunction); - this.limiter = limiter; - this.overlimitFunction = overlimitFunction; - this.costFunction = costFunction; - this.overlimitCounter = statsLogger.getCounter("overlimit"); - } - - @Override - public void apply(Request request) throws OverCapacityException { - int permits = costFunction.apply(request); - if (!limiter.acquire(permits)) { - overlimitCounter.inc(); - overlimitFunction.apply(request); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/GuavaRateLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/GuavaRateLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/GuavaRateLimiter.java deleted file mode 100644 index 3f1909a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/GuavaRateLimiter.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.limiter; - -import com.google.common.base.Preconditions; - -/** - * Wrap a guava limiter in a simple interface to make testing easier. - * Notes: - * 1. Negative limit translates into (virtually) unlimited. - * 2. Calling acquire with permits == 0 translates into no acquire. - */ -public class GuavaRateLimiter implements RateLimiter { - com.google.common.util.concurrent.RateLimiter limiter; - - public static RateLimiter of(int limit) { - if (limit == 0) { - return RateLimiter.REJECT; - } else if (limit < 0) { - return RateLimiter.ACCEPT; - } else { - return new GuavaRateLimiter(limit); - } - } - - public GuavaRateLimiter(int limit) { - double effectiveLimit = limit; - if (limit < 0) { - effectiveLimit = Double.POSITIVE_INFINITY; - } - this.limiter = com.google.common.util.concurrent.RateLimiter.create(effectiveLimit); - } - - @Override - public boolean acquire(int permits) { - Preconditions.checkState(permits >= 0); - if (permits > 0) { - return limiter.tryAcquire(permits); - } else { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java deleted file mode 100644 index 0cb1ebe..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RateLimiter.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.limiter; - -/** - * Simple interface for a rate limiter used by RequestLimiter. - */ -public interface RateLimiter { - - public static final RateLimiter REJECT = new RateLimiter() { - @Override - public boolean acquire(int permits) { - return false; - } - }; - - public static final RateLimiter ACCEPT = new RateLimiter() { - @Override - public boolean acquire(int permits) { - return true; - } - }; - - public static abstract class Builder { - public abstract RateLimiter build(); - } - - /** - * Try to acquire a certain number of permits. - * - * @param permits number of permits to acquire - */ - boolean acquire(int permits); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RequestLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RequestLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RequestLimiter.java deleted file mode 100644 index 6c5ad96..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/RequestLimiter.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.limiter; - -import com.twitter.distributedlog.exceptions.OverCapacityException; - -public interface RequestLimiter<Request> { - public void apply(Request request) throws OverCapacityException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/package-info.java deleted file mode 100644 index d5f61a8..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/limiter/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Rate limiting for distributedlog - */ -package com.twitter.distributedlog.limiter; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java deleted file mode 100644 index fa8bdf0..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLock.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.util.Future; - -/** - * Interface for distributed locking - */ -public interface DistributedLock extends AsyncCloseable { - - /** - * Asynchronously acquire the lock. - * - * @return future represents the acquire result. - */ - Future<? extends DistributedLock> asyncAcquire(); - - /** - * Check if hold lock. If it doesn't, then re-acquire the lock. - * - * @throws LockingException if the lock attempt fails - * @see #checkOwnership() - */ - void checkOwnershipAndReacquire() throws LockingException; - - /** - * Check if the lock is held. If not, error out and do not re-acquire. - * Use this in cases where there are many waiters by default and re-acquire - * is unlikely to succeed. - * - * @throws LockingException if we lost the ownership - * @see #checkOwnershipAndReacquire() - */ - void checkOwnership() throws LockingException; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLockContext.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLockContext.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLockContext.java deleted file mode 100644 index 1914793..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/DistributedLockContext.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import org.apache.commons.lang3.tuple.Pair; - -import java.util.HashSet; -import java.util.Set; - -class DistributedLockContext { - private final Set<Pair<String, Long>> lockIds; - - DistributedLockContext() { - this.lockIds = new HashSet<Pair<String, Long>>(); - } - - synchronized void addLockId(Pair<String, Long> lockId) { - this.lockIds.add(lockId); - } - - synchronized void clearLockIds() { - this.lockIds.clear(); - } - - synchronized boolean hasLockId(Pair<String, Long> lockId) { - return this.lockIds.contains(lockId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/EpochChangedException.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/EpochChangedException.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/EpochChangedException.java deleted file mode 100644 index 032a9cd..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/EpochChangedException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; - -/** - * Exception indicates that epoch already changed when executing a given - * {@link LockAction}. - */ -public class EpochChangedException extends LockingException { - - private static final long serialVersionUID = 8775257025963870331L; - - public EpochChangedException(String lockPath, int expectedEpoch, int currentEpoch) { - super(lockPath, "lock " + lockPath + " already moved to epoch " + currentEpoch + ", expected " + expectedEpoch); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockAction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockAction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockAction.java deleted file mode 100644 index 46b420d..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockAction.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -/** - * Lock Action - */ -interface LockAction { - - /** - * Execute a lock action - */ - void execute(); - - /** - * Get lock action name. - * - * @return lock action name - */ - String getActionName(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockClosedException.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockClosedException.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockClosedException.java deleted file mode 100644 index 5b676bf..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockClosedException.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.lock.ZKSessionLock.State; -import org.apache.commons.lang3.tuple.Pair; - -/** - * Exception indicates that the lock was closed (unlocked) before the lock request could complete. - */ -public class LockClosedException extends LockingException { - - private static final long serialVersionUID = 8775257025963470331L; - - public LockClosedException(String lockPath, String msg) { - super(lockPath, msg); - } - - public LockClosedException(String lockPath, Pair<String, Long> lockId, State currentState) { - super(lockPath, "lock at path " + lockPath + " with id " + lockId + " closed early in state : " + currentState); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockListener.java deleted file mode 100644 index 681c180..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -/** - * Listener on lock state changes - */ -interface LockListener { - /** - * Triggered when a lock is changed from CLAIMED to EXPIRED. - */ - void onExpired(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockSessionExpiredException.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockSessionExpiredException.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockSessionExpiredException.java deleted file mode 100644 index dac1253..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockSessionExpiredException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.lock.ZKSessionLock.State; -import org.apache.commons.lang3.tuple.Pair; - -/** - * Exception indicates that the lock's zookeeper session was expired before the lock request could complete. - */ -public class LockSessionExpiredException extends LockingException { - - private static final long serialVersionUID = 8775253025963470331L; - - public LockSessionExpiredException(String lockPath, Pair<String, Long> lockId, State currentState) { - super(lockPath, "lock at path " + lockPath + " with id " + lockId + " expired early in state : " + currentState); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockStateChangedException.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockStateChangedException.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockStateChangedException.java deleted file mode 100644 index 2b99795..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockStateChangedException.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.lock.ZKSessionLock.State; -import org.apache.commons.lang3.tuple.Pair; - -/** - * Exception thrown when lock state changed - */ -public class LockStateChangedException extends LockingException { - - private static final long serialVersionUID = -3770866789942102262L; - - LockStateChangedException(String lockPath, Pair<String, Long> lockId, - State expectedState, State currentState) { - super(lockPath, "Lock state of " + lockId + " for " + lockPath + " has changed : expected " - + expectedState + ", but " + currentState); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockTimeoutException.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockTimeoutException.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockTimeoutException.java deleted file mode 100644 index 3020980..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockTimeoutException.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; - -import java.util.concurrent.TimeUnit; - -/** - * Exception thrown when acquiring lock timeout - */ -public class LockTimeoutException extends LockingException { - - private static final long serialVersionUID = -3837638877423323820L; - - LockTimeoutException(String lockPath, long timeout, TimeUnit unit) { - super(lockPath, "Locking " + lockPath + " timeout in " + timeout + " " + unit); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockWaiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockWaiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockWaiter.java deleted file mode 100644 index 73ffabc..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/LockWaiter.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.Timer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Lock waiter represents the attempt that application tries to lock. - */ -public class LockWaiter { - - private static final Logger logger = LoggerFactory.getLogger(LockWaiter.class); - - private final String lockId; - private final String currentOwner; - private final Future<Boolean> acquireFuture; - - public LockWaiter(String lockId, - String currentOwner, - Future<Boolean> acquireFuture) { - this.lockId = lockId; - this.currentOwner = currentOwner; - this.acquireFuture = acquireFuture; - } - - /** - * Return the lock id of the waiter. - * - * @return lock id of the waiter - */ - public String getId() { - return lockId; - } - - /** - * Return the owner that observed when waiter is waiting. - * - * @return the owner that observed when waiter is waiting - */ - public String getCurrentOwner() { - return currentOwner; - } - - /** - * Return the future representing the waiting result. - * - * <p>If the future is interrupted (e.g. {@link Future#within(Duration, Timer)}), - * the waiter will automatically clean up its waiting state. - * - * @return the future representing the acquire result. - */ - public Future<Boolean> getAcquireFuture() { - return acquireFuture; - } - - /** - * Wait for the acquire result. - * - * @return true if acquired successfully, otherwise false. - */ - public boolean waitForAcquireQuietly() { - boolean success = false; - try { - success = Await.result(acquireFuture); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (LockTimeoutException lte) { - logger.debug("Timeout on lock acquiring", lte); - } catch (Exception e) { - logger.error("Caught exception waiting for lock acquired", e); - } - return success; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java deleted file mode 100644 index ef6b9ab..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/NopDistributedLock.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.util.Future; - -/** - * An implementation of {@link DistributedLock} which does nothing. - */ -public class NopDistributedLock implements DistributedLock { - - public static final DistributedLock INSTANCE = new NopDistributedLock(); - - private NopDistributedLock() {} - - @Override - public Future<? extends DistributedLock> asyncAcquire() { - return Future.value(this); - } - - @Override - public void checkOwnershipAndReacquire() throws LockingException { - // no-op - } - - @Override - public void checkOwnership() throws LockingException { - // no-op - } - - @Override - public Future<Void> asyncClose() { - return Future.Void(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java deleted file mode 100644 index 95cd593..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLock.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; -import com.twitter.util.Future; -import scala.runtime.BoxedUnit; - -import java.util.concurrent.TimeUnit; - -/** - * One time lock. - * <p>The lock is only alive during a given period. It should - * be not usable if the lock is expired. - * <p>Listener could be registered by {@link #setLockListener(LockListener)} - * to receive state changes of the lock. - */ -public interface SessionLock { - - /** - * Set lock listener for lock state changes. - * <p>Typically a listener should be set before try locking. - * - * @param lockListener - * lock listener for state changes. - */ - SessionLock setLockListener(LockListener lockListener); - - /** - * Whether the lock is held or not? - * - * @return true if the lock is held, otherwise false. - */ - boolean isLockHeld(); - - /** - * Whether the lock is expired or not? - * <p>If a lock is expired, it will not be reusable any more. Because it is an one-time lock. - * - * @return true if the lock is expired, otherwise false. - */ - boolean isLockExpired(); - - /** - * Acquire the lock if it is free within given waiting time. - * <p> - * Calling this method will attempt to acquire the lock. If the lock - * is already acquired by others, the caller will wait for <i>timeout</i> - * period. If the caller could claim the lock within <i>timeout</i> period, - * the caller acquire the lock. Otherwise, it would fail with {@link OwnershipAcquireFailedException}. - * <p> - * {@link #unlock()} should be called to unlock a claimed lock. The caller - * doesn't need to unlock to clean up resources if <i>tryLock</i> fails. - * <p> - * <i>tryLock</i> here is effectively the combination of following asynchronous calls. - * <pre> - * ZKDistributedLock lock = ...; - * Future<LockWaiter> attemptFuture = lock.asyncTryLock(...); - * - * boolean acquired = waiter.waitForAcquireQuietly(); - * if (acquired) { - * // ... - * } else { - * // ... - * } - * </pre> - * - * @param timeout - * timeout period to wait for claiming ownership - * @param unit - * unit of timeout period - * @throws OwnershipAcquireFailedException if the lock is already acquired by others - * @throws LockingException when encountered other lock related issues. - */ - void tryLock(long timeout, TimeUnit unit) - throws OwnershipAcquireFailedException, LockingException; - - /** - * Acquire the lock in asynchronous way. - * <p> - * Calling this method will attempt to place a lock waiter to acquire this lock. - * The future returned by this method represents the result of this attempt. It doesn't mean - * the caller acquired the lock or not. The application should check {@link LockWaiter#getAcquireFuture()} - * to see if it acquired the lock or not. - * - * @param timeout - * timeout period to wait for claiming ownership - * @param unit - * unit of timeout period - * @return lock waiter representing this attempt of acquiring lock. - * @see #tryLock(long, TimeUnit) - */ - Future<LockWaiter> asyncTryLock(long timeout, TimeUnit unit); - - /** - * Release a claimed lock. - * - * @see #tryLock(long, TimeUnit) - */ - void unlock(); - - /** - * Release a claimed lock in the asynchronous way. - * - * @return future representing the result of unlock operation. - * @see #unlock() - */ - Future<BoxedUnit> asyncUnlock(); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLockFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLockFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLockFactory.java deleted file mode 100644 index 4334626..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/SessionLockFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.lock; - -import com.twitter.util.Future; - -/** - * Factory to create {@link SessionLock} - */ -public interface SessionLockFactory { - - /** - * Create a lock with lock path. - * - * @param lockPath - * lock path - * @param context - * lock context - * @return future represents the creation result. - */ - Future<SessionLock> createLock(String lockPath, DistributedLockContext context); - -}