http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java deleted file mode 100644 index bb14066..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.tools; - -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; - -import java.util.Map; -import java.util.TreeMap; - -/** - * A Tool Framework - */ -public abstract class Tool { - - /** - * Interface of a command to run in a tool. - */ - protected interface Command { - String getName(); - String getDescription(); - int runCmd(String[] args) throws Exception; - void printUsage(); - } - - /** - * {@link org.apache.commons.cli.Options} based command. - */ - protected abstract static class OptsCommand implements Command { - - /** - * @return options used by this command. - */ - protected abstract Options getOptions(); - - /** - * @return usage of this command. - */ - protected String getUsage() { - return cmdName + " [options]"; - } - - /** - * Run given command line <i>commandLine</i>. - * - * @param commandLine - * command line to run. - * @return return code of this command. - * @throws Exception - */ - protected abstract int runCmd(CommandLine commandLine) throws Exception; - - protected String cmdName; - protected String description; - - protected OptsCommand(String name, String description) { - this.cmdName = name; - this.description = description; - } - - @Override - public String getName() { - return cmdName; - } - - @Override - public String getDescription() { - return description; - } - - @Override - public int runCmd(String[] args) throws Exception { - try { - BasicParser parser = new BasicParser(); - CommandLine cmdline = parser.parse(getOptions(), args); - return runCmd(cmdline); - } catch (ParseException e) { - printUsage(); - return -1; - } - } - - @Override - public void printUsage() { - HelpFormatter helpFormatter = new HelpFormatter(); - println(cmdName + ": " + getDescription()); - helpFormatter.printHelp(getUsage(), getOptions()); - } - } - - public class HelpCommand implements Command { - - @Override - public String getName() { - return "help"; - } - - @Override - public String getDescription() { - return "describe the usage of this tool or its sub-commands."; - } - - @Override - public int runCmd(String[] args) throws Exception { - if (args.length == 0) { - printToolUsage(); - return -1; - } - String cmdName = args[0]; - Command command = commands.get(cmdName); - if (null == command) { - System.err.println("Unknown command " + cmdName); - printToolUsage(); - return -1; - } - command.printUsage(); - println(""); - return 0; - } - - @Override - public void printUsage() { - println(getName() + ": " + getDescription()); - println(""); - println("usage: " + getName() + " <command>"); - } - } - - // Commands managed by a tool - protected final Map<String, Command> commands = - new TreeMap<String, Command>(); - - protected Tool() { - addCommand(new HelpCommand()); - } - - /** - * @return tool name. - */ - protected abstract String getName(); - - /** - * Add a command in this tool. - * - * @param command - * command to run in this tool. - */ - protected void addCommand(Command command) { - commands.put(command.getName(), command); - } - - /** - * Print a message in this tool. - * - * @param msg - * message to print - */ - protected static void println(String msg) { - System.out.println(msg); - } - - /** - * print tool usage. - */ - protected void printToolUsage() { - println("Usage: " + getName() + " <command>"); - println(""); - int maxKeyLength = 0; - for (String key : commands.keySet()) { - if (key.length() > maxKeyLength) { - maxKeyLength = key.length(); - } - } - maxKeyLength += 2; - for (Map.Entry<String, Command> entry : commands.entrySet()) { - StringBuilder spacesBuilder = new StringBuilder(); - int numSpaces = maxKeyLength - entry.getKey().length(); - for (int i = 0; i < numSpaces; i++) { - spacesBuilder.append(" "); - } - println("\t" + entry.getKey() + spacesBuilder.toString() + ": " + entry.getValue().getDescription()); - } - println(""); - } - - public int run(String[] args) throws Exception { - if (args.length <= 0) { - printToolUsage(); - return -1; - } - String cmdName = args[0]; - Command cmd = commands.get(cmdName); - if (null == cmd) { - System.err.println("ERROR: Unknown command " + cmdName); - printToolUsage(); - return -1; - } - // prepare new args - String[] newArgs = new String[args.length - 1]; - System.arraycopy(args, 1, newArgs, 0, newArgs.length); - return cmd.runCmd(newArgs); - } - - public static void main(String args[]) { - int rc = -1; - if (args.length <= 0) { - System.err.println("No tool to run."); - System.err.println(""); - System.err.println("Usage : Tool <tool_class_name> <options>"); - System.exit(-1); - } - String toolClass = args[0]; - try { - Tool tool = ReflectionUtils.newInstance(toolClass, Tool.class); - String[] newArgs = new String[args.length - 1]; - System.arraycopy(args, 1, newArgs, 0, newArgs.length); - rc = tool.run(newArgs); - } catch (Throwable t) { - System.err.println("Fail to run tool " + toolClass + " : "); - t.printStackTrace(); - } - System.exit(rc); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java deleted file mode 100644 index e2125bc..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Tools for distributedlog - */ -package com.twitter.distributedlog.tools; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java deleted file mode 100644 index dcc3f58..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.io.AsyncDeleteable; -import com.twitter.distributedlog.util.Transaction.OpListener; -import com.twitter.util.Future; - -import java.io.IOException; - -/** - * A common interface to allocate <i>I</i> under transaction <i>T</i>. - * - * <h3>Usage Example</h3> - * - * Here is an example on demonstrating how `Allocator` works. - * - * <pre> {@code - * Allocator<I, T, R> allocator = ...; - * - * // issue an allocate request - * try { - * allocator.allocate(); - * } catch (IOException ioe) { - * // handle the exception - * ... - * return; - * } - * - * // Start a transaction - * final Transaction<T> txn = ...; - * - * // Try obtain object I - * Future<I> tryObtainFuture = allocator.tryObtain(txn, new OpListener<I>() { - * public void onCommit(I resource) { - * // the obtain succeed, process with the resource - * } - * public void onAbort() { - * // the obtain failed. - * } - * }).addFutureEventListener(new FutureEventListener() { - * public void onSuccess(I resource) { - * // the try obtain succeed. but the obtain has not been confirmed or aborted. - * // execute the transaction to confirm if it could complete obtain - * txn.execute(); - * } - * public void onFailure(Throwable t) { - * // handle the failure of try obtain - * } - * }); - * - * }</pre> - */ -public interface Allocator<I, T> extends AsyncCloseable, AsyncDeleteable { - - /** - * Issue allocation request to allocate <i>I</i>. - * The implementation should be non-blocking call. - * - * @throws IOException - * if fail to request allocating a <i>I</i>. - */ - void allocate() throws IOException; - - /** - * Try obtaining an <i>I</i> in a given transaction <i>T</i>. The object obtained is tentative. - * Whether the object is obtained or aborted is determined by the result of the execution. You could - * register a listener under this `tryObtain` operation to know whether the object is obtained or - * aborted. - * - * <p> - * It is a typical two-phases operation on obtaining a resource from allocator. - * The future returned by this method acts as a `prepare` operation, the resource is tentative obtained - * from the allocator. The execution of the txn acts as a `commit` operation, the resource is confirmed - * to be obtained by this transaction. <code>listener</code> is for the whole completion of the obtain. - * <p> - * <code>listener</code> is only triggered after `prepare` succeed. if `prepare` failed, no actions will - * happen to the listener. - * - * @param txn - * transaction. - * @return future result returning <i>I</i> that would be obtained under transaction <code>txn</code>. - */ - Future<I> tryObtain(Transaction<T> txn, OpListener<I> listener); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java deleted file mode 100644 index 95ef3e2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.base.Optional; -import org.apache.commons.cli.CommandLine; - -/** - * Utils to commandline - */ -public class CommandLineUtils { - - public static Optional<String> getOptionalStringArg(CommandLine cmdline, String arg) { - if (cmdline.hasOption(arg)) { - return Optional.of(cmdline.getOptionValue(arg)); - } else { - return Optional.absent(); - } - } - - public static Optional<Boolean> getOptionalBooleanArg(CommandLine cmdline, String arg) { - if (cmdline.hasOption(arg)) { - return Optional.of(true); - } else { - return Optional.absent(); - } - } - - public static Optional<Integer> getOptionalIntegerArg(CommandLine cmdline, String arg) throws IllegalArgumentException { - try { - if (cmdline.hasOption(arg)) { - return Optional.of(Integer.parseInt(cmdline.getOptionValue(arg))); - } else { - return Optional.absent(); - } - } catch (NumberFormatException ex) { - throw new IllegalArgumentException(arg + " is not a number"); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java deleted file mode 100644 index 46dd3b6..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.config.ConcurrentConstConfiguration; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.commons.configuration.Configuration; - -import java.util.Iterator; - -public class ConfUtils { - - /** - * Load configurations with prefixed <i>section</i> from source configuration <i>srcConf</i> into - * target configuration <i>targetConf</i>. - * - * @param targetConf - * Target Configuration - * @param srcConf - * Source Configuration - * @param section - * Section Key - */ - public static void loadConfiguration(Configuration targetConf, Configuration srcConf, String section) { - Iterator confKeys = srcConf.getKeys(); - while (confKeys.hasNext()) { - Object keyObject = confKeys.next(); - if (!(keyObject instanceof String)) { - continue; - } - String key = (String) keyObject; - if (key.startsWith(section)) { - targetConf.setProperty(key.substring(section.length()), srcConf.getProperty(key)); - } - } - } - - /** - * Create const dynamic configuration based on distributedlog configuration. - * - * @param conf - * static distributedlog configuration. - * @return dynamic configuration - */ - public static DynamicDistributedLogConfiguration getConstDynConf(DistributedLogConfiguration conf) { - ConcurrentConstConfiguration constConf = new ConcurrentConstConfiguration(conf); - return new DynamicDistributedLogConfiguration(constConf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java deleted file mode 100644 index 2f9e091..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.base.Objects; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import org.apache.commons.lang.StringUtils; - -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; - -import static com.google.common.base.Charsets.UTF_8; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization. - */ -public class DLUtils { - - /** - * Find the log segment whose transaction ids are not less than provided <code>transactionId</code>. - * - * @param segments - * segments to search - * @param transactionId - * transaction id to find - * @return the first log segment whose transaction ids are not less than <code>transactionId</code>. - */ - public static int findLogSegmentNotLessThanTxnId(List<LogSegmentMetadata> segments, - long transactionId) { - int found = -1; - for (int i = segments.size() - 1; i >= 0; i--) { - LogSegmentMetadata segment = segments.get(i); - if (segment.getFirstTxId() <= transactionId) { - found = i; - break; - } - } - if (found <= -1) { - return -1; - } - if (found == 0 && segments.get(0).getFirstTxId() == transactionId) { - return 0; - } - LogSegmentMetadata foundSegment = segments.get(found); - if (foundSegment.getFirstTxId() == transactionId) { - for (int i = found - 1; i >= 0; i--) { - LogSegmentMetadata segment = segments.get(i); - if (segment.isInProgress()) { - break; - } - if (segment.getLastTxId() < transactionId) { - break; - } - found = i; - } - return found; - } else { - if (foundSegment.isInProgress() - || found == segments.size() - 1) { - return found; - } - if (foundSegment.getLastTxId() >= transactionId) { - return found; - } - return found + 1; - } - } - - /** - * Assign next log segment sequence number based on a decreasing list of log segments. - * - * @param segmentListDesc - * a decreasing list of log segments - * @return null if no log segments was assigned a sequence number in <code>segmentListDesc</code>. - * otherwise, return next log segment sequence number - */ - public static Long nextLogSegmentSequenceNumber(List<LogSegmentMetadata> segmentListDesc) { - int lastAssignedLogSegmentIdx = -1; - Long lastAssignedLogSegmentSeqNo = null; - Long nextLogSegmentSeqNo = null; - - for (int i = 0; i < segmentListDesc.size(); i++) { - LogSegmentMetadata metadata = segmentListDesc.get(i); - if (LogSegmentMetadata.supportsLogSegmentSequenceNo(metadata.getVersion())) { - lastAssignedLogSegmentSeqNo = metadata.getLogSegmentSequenceNumber(); - lastAssignedLogSegmentIdx = i; - break; - } - } - - if (null != lastAssignedLogSegmentSeqNo) { - // latest log segment is assigned with a sequence number, start with next sequence number - nextLogSegmentSeqNo = lastAssignedLogSegmentSeqNo + lastAssignedLogSegmentIdx + 1; - } - return nextLogSegmentSeqNo; - } - - /** - * Compute the start sequence id for <code>segment</code>, based on previous segment list - * <code>segmentListDesc</code>. - * - * @param logSegmentDescList - * list of segments in descending order - * @param segment - * segment to compute start sequence id for - * @return start sequence id - */ - public static long computeStartSequenceId(List<LogSegmentMetadata> logSegmentDescList, - LogSegmentMetadata segment) - throws UnexpectedException { - long startSequenceId = 0L; - for (LogSegmentMetadata metadata : logSegmentDescList) { - if (metadata.getLogSegmentSequenceNumber() >= segment.getLogSegmentSequenceNumber()) { - continue; - } else if (metadata.getLogSegmentSequenceNumber() < (segment.getLogSegmentSequenceNumber() - 1)) { - break; - } - if (metadata.isInProgress()) { - throw new UnexpectedException("Should not complete log segment " + segment.getLogSegmentSequenceNumber() - + " since it's previous log segment is still inprogress : " + logSegmentDescList); - } - if (metadata.supportsSequenceId()) { - startSequenceId = metadata.getStartSequenceId() + metadata.getRecordCount(); - } - } - return startSequenceId; - } - - /** - * Deserialize log segment sequence number for bytes <code>data</code>. - * - * @param data - * byte representation of log segment sequence number - * @return log segment sequence number - * @throws NumberFormatException if the bytes aren't valid - */ - public static long deserializeLogSegmentSequenceNumber(byte[] data) { - String seqNoStr = new String(data, UTF_8); - return Long.parseLong(seqNoStr); - } - - /** - * Serilize log segment sequence number <code>logSegmentSeqNo</code> into bytes. - * - * @param logSegmentSeqNo - * log segment sequence number - * @return byte representation of log segment sequence number - */ - public static byte[] serializeLogSegmentSequenceNumber(long logSegmentSeqNo) { - return Long.toString(logSegmentSeqNo).getBytes(UTF_8); - } - - /** - * Deserialize log record transaction id for bytes <code>data</code>. - * - * @param data - * byte representation of log record transaction id - * @return log record transaction id - * @throws NumberFormatException if the bytes aren't valid - */ - public static long deserializeTransactionId(byte[] data) { - String seqNoStr = new String(data, UTF_8); - return Long.parseLong(seqNoStr); - } - - /** - * Serilize log record transaction id <code>transactionId</code> into bytes. - * - * @param transactionId - * log record transaction id - * @return byte representation of log record transaction id. - */ - public static byte[] serializeTransactionId(long transactionId) { - return Long.toString(transactionId).getBytes(UTF_8); - } - - /** - * Serialize log segment id into bytes. - * - * @param logSegmentId - * log segment id - * @return bytes representation of log segment id - */ - public static byte[] logSegmentId2Bytes(long logSegmentId) { - return Long.toString(logSegmentId).getBytes(UTF_8); - } - - /** - * Deserialize bytes into log segment id. - * - * @param data - * bytes representation of log segment id - * @return log segment id - */ - public static long bytes2LogSegmentId(byte[] data) { - return Long.parseLong(new String(data, UTF_8)); - } - - /** - * Normalize the uri. - * - * @param uri the distributedlog uri. - * @return the normalized uri - */ - public static URI normalizeURI(URI uri) { - checkNotNull(uri, "DistributedLog uri is null"); - String scheme = uri.getScheme(); - checkNotNull(scheme, "Invalid distributedlog uri : " + uri); - scheme = scheme.toLowerCase(); - String[] schemeParts = StringUtils.split(scheme, '-'); - checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()), - "Unknown distributedlog scheme found : " + uri); - URI normalizedUri; - try { - normalizedUri = new URI( - schemeParts[0], // remove backend info - uri.getAuthority(), - uri.getPath(), - uri.getQuery(), - uri.getFragment()); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e); - } - return normalizedUri; - } - - private static String getHostIpLockClientId() { - try { - return InetAddress.getLocalHost().toString(); - } catch(Exception ex) { - return DistributedLogConstants.UNKNOWN_CLIENT_ID; - } - } - - /** - * Normalize the client id. - * - * @return the normalized client id. - */ - public static String normalizeClientId(String clientId) { - String normalizedClientId; - if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) { - normalizedClientId = getHostIpLockClientId(); - } else { - normalizedClientId = clientId; - } - return normalizedClientId; - } - - /** - * Is it a reserved stream name in bkdl namespace? - * - * @param name - * stream name - * @return true if it is reserved name, otherwise false. - */ - public static boolean isReservedStreamName(String name) { - return name.startsWith("."); - } - - /** - * Validate the stream name. - * - * @param nameOfStream - * name of stream - * @throws InvalidStreamNameException - */ - public static void validateName(String nameOfStream) - throws InvalidStreamNameException { - String reason = null; - char chars[] = nameOfStream.toCharArray(); - char c; - // validate the stream to see if meet zookeeper path's requirement - for (int i = 0; i < chars.length; i++) { - c = chars[i]; - - if (c == 0) { - reason = "null character not allowed @" + i; - break; - } else if (c == '/') { - reason = "'/' not allowed @" + i; - break; - } else if (c > '\u0000' && c < '\u001f' - || c > '\u007f' && c < '\u009F' - || c > '\ud800' && c < '\uf8ff' - || c > '\ufff0' && c < '\uffff') { - reason = "invalid charater @" + i; - break; - } - } - if (null != reason) { - throw new InvalidStreamNameException(nameOfStream, reason); - } - if (isReservedStreamName(nameOfStream)) { - throw new InvalidStreamNameException(nameOfStream, - "Stream Name is reserved"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java deleted file mode 100644 index 64101b3..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FailpointUtils { - static final Logger logger = LoggerFactory.getLogger(FailpointUtils.class); - - public enum FailPointName { - FP_StartLogSegmentBeforeLedgerCreate, - FP_StartLogSegmentAfterLedgerCreate, - FP_StartLogSegmentAfterInProgressCreate, - FP_StartLogSegmentOnAssignLogSegmentSequenceNumber, - FP_FinalizeLedgerBeforeDelete, - FP_TransmitBeforeAddEntry, - FP_TransmitComplete, - FP_WriteInternalLostLock, - FP_TransmitFailGetBuffer, - FP_LockUnlockCleanup, - FP_LockTryCloseRaceCondition, - FP_LockTryAcquire, - FP_ZooKeeperConnectionLoss, - FP_RecoverIncompleteLogSegments, - FP_LogWriterIssuePending, - } - - public static interface FailPointAction { - boolean checkFailPoint() throws IOException; - boolean checkFailPointNoThrow(); - } - - public static abstract class AbstractFailPointAction implements FailPointAction { - @Override - public boolean checkFailPointNoThrow() { - try { - return checkFailPoint(); - } catch (IOException ex) { - logger.error("failpoint action raised unexpected exception"); - return true; - } - } - } - - public static final FailPointAction DEFAULT_ACTION = new AbstractFailPointAction() { - @Override - public boolean checkFailPoint() throws IOException { - return true; - } - }; - - public static final FailPointAction THROW_ACTION = new AbstractFailPointAction() { - @Override - public boolean checkFailPoint() throws IOException { - throw new IOException("Throw ioexception for failure point"); - } - }; - - public enum FailPointActions { - FailPointAction_Default, - FailPointAction_Throw - } - - static ConcurrentHashMap<FailPointName, FailPointAction> failPointState = - new ConcurrentHashMap<FailPointName, FailPointAction>(); - - public static void setFailpoint(FailPointName failpoint, FailPointActions action) { - FailPointAction fpAction = null; - switch (action) { - case FailPointAction_Default: - fpAction = DEFAULT_ACTION; - break; - case FailPointAction_Throw: - fpAction = THROW_ACTION; - break; - default: - break; - } - setFailpoint(failpoint, fpAction); - } - - public static void setFailpoint(FailPointName failpoint, FailPointAction action) { - if (null != action) { - failPointState.put(failpoint, action); - } - } - - public static void removeFailpoint(FailPointName failpoint) { - failPointState.remove(failpoint); - } - - public static boolean checkFailPoint(FailPointName failPoint) throws IOException { - FailPointAction action = failPointState.get(failPoint); - - if (action == null) { - return false; - } - - try { - return action.checkFailPoint(); - } catch (IOException ioe) { - throw new IOException("Induced Exception at:" + failPoint, ioe); - } - } - - public static boolean checkFailPointNoThrow(FailPointName failPoint) { - FailPointAction action = failPointState.get(failPoint); - - if (action == null) { - return false; - } - - return action.checkFailPointNoThrow(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java deleted file mode 100644 index f206a25..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.exceptions.BKTransmitException; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.exceptions.ZKException; -import com.twitter.distributedlog.stats.OpStatsListener; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureCancelledException; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import com.twitter.util.Try; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Utilities to process future - */ -public class FutureUtils { - - private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class); - - public static class OrderedFutureEventListener<R> - implements FutureEventListener<R> { - - public static <R> OrderedFutureEventListener<R> of( - FutureEventListener<R> listener, - OrderedScheduler scheduler, - Object key) { - return new OrderedFutureEventListener<R>(scheduler, key, listener); - } - - private final OrderedScheduler scheduler; - private final Object key; - private final FutureEventListener<R> listener; - - private OrderedFutureEventListener(OrderedScheduler scheduler, - Object key, - FutureEventListener<R> listener) { - this.scheduler = scheduler; - this.key = key; - this.listener = listener; - } - - @Override - public void onSuccess(final R value) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - listener.onSuccess(value); - } - }); - } - - @Override - public void onFailure(final Throwable cause) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - listener.onFailure(cause); - } - }); - } - } - - public static class FutureEventListenerRunnable<R> - implements FutureEventListener<R> { - - public static <R> FutureEventListenerRunnable<R> of( - FutureEventListener<R> listener, - ExecutorService executorService) { - return new FutureEventListenerRunnable<R>(executorService, listener); - } - - private final ExecutorService executorService; - private final FutureEventListener<R> listener; - - private FutureEventListenerRunnable(ExecutorService executorService, - FutureEventListener<R> listener) { - this.executorService = executorService; - this.listener = listener; - } - - @Override - public void onSuccess(final R value) { - executorService.submit(new Runnable() { - @Override - public void run() { - listener.onSuccess(value); - } - }); - } - - @Override - public void onFailure(final Throwable cause) { - executorService.submit(new Runnable() { - @Override - public void run() { - listener.onFailure(cause); - } - }); - } - } - - private static class ListFutureProcessor<T, R> - extends Function<Throwable, BoxedUnit> - implements FutureEventListener<R>, Runnable { - - private volatile boolean interrupted = false; - private final Iterator<T> itemsIter; - private final Function<T, Future<R>> processFunc; - private final Promise<List<R>> promise; - private final List<R> results; - private final ExecutorService callbackExecutor; - - ListFutureProcessor(List<T> items, - Function<T, Future<R>> processFunc, - ExecutorService callbackExecutor) { - this.itemsIter = items.iterator(); - this.processFunc = processFunc; - this.promise = new Promise<List<R>>(); - this.promise.setInterruptHandler(this); - this.results = new ArrayList<R>(); - this.callbackExecutor = callbackExecutor; - } - - @Override - public BoxedUnit apply(Throwable cause) { - interrupted = true; - return BoxedUnit.UNIT; - } - - @Override - public void onSuccess(R value) { - results.add(value); - if (null == callbackExecutor) { - run(); - } else { - callbackExecutor.submit(this); - } - } - - @Override - public void onFailure(final Throwable cause) { - if (null == callbackExecutor) { - promise.setException(cause); - } else { - callbackExecutor.submit(new Runnable() { - @Override - public void run() { - promise.setException(cause); - } - }); - } - } - - @Override - public void run() { - if (interrupted) { - logger.debug("ListFutureProcessor is interrupted."); - return; - } - if (!itemsIter.hasNext()) { - promise.setValue(results); - return; - } - processFunc.apply(itemsIter.next()).addEventListener(this); - } - } - - /** - * Process the list of items one by one using the process function <i>processFunc</i>. - * The process will be stopped immediately if it fails on processing any one. - * - * @param collection list of items - * @param processFunc process function - * @param callbackExecutor executor to process the item - * @return future presents the list of processed results - */ - public static <T, R> Future<List<R>> processList(List<T> collection, - Function<T, Future<R>> processFunc, - @Nullable ExecutorService callbackExecutor) { - ListFutureProcessor<T, R> processor = - new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor); - if (null != callbackExecutor) { - callbackExecutor.submit(processor); - } else { - processor.run(); - } - return processor.promise; - } - - /** - * Add a event listener over <i>result</i> for collecting the operation stats. - * - * @param result result to listen on - * @param opStatsLogger stats logger to record operations stats - * @param stopwatch stop watch to time operation - * @param <T> - * @return result after registered the event listener - */ - public static <T> Future<T> stats(Future<T> result, - OpStatsLogger opStatsLogger, - Stopwatch stopwatch) { - return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch)); - } - - /** - * Await for the result of the future and thrown bk related exceptions. - * - * @param result future to wait for - * @return the result of future - * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions - * thrown from the future, the exceptions will be wrapped into - * {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}. - */ - public static <T> T bkResult(Future<T> result) throws BKException { - try { - return Await.result(result); - } catch (BKException bke) { - throw bke; - } catch (InterruptedException ie) { - throw BKException.create(BKException.Code.InterruptedException); - } catch (Exception e) { - logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e); - throw BKException.create(BKException.Code.UnexpectedConditionException); - } - } - - /** - * Return the bk exception return code for a <i>throwable</i>. - * - * @param throwable the cause of the exception - * @return the bk exception return code. if the exception isn't bk exceptions, - * it would return {@link BKException.Code#UnexpectedConditionException}. - */ - public static int bkResultCode(Throwable throwable) { - if (throwable instanceof BKException) { - return ((BKException)throwable).getCode(); - } - return BKException.Code.UnexpectedConditionException; - } - - /** - * Wait for the result until it completes. - * - * @param result result to wait - * @return the result - * @throws IOException when encountered exceptions on the result - */ - public static <T> T result(Future<T> result) throws IOException { - return result(result, Duration.Top()); - } - - /** - * Wait for the result for a given <i>duration</i>. - * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with - * corresponding {@link com.twitter.util.TimeoutException}. - * - * @param result result to wait - * @param duration duration to wait - * @return the result - * @throws IOException when encountered exceptions on the result or waiting for the result. - */ - public static <T> T result(Future<T> result, Duration duration) - throws IOException { - try { - return Await.result(result, duration); - } catch (KeeperException ke) { - throw new ZKException("Encountered zookeeper exception on waiting result", ke); - } catch (BKException bke) { - throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode()); - } catch (IOException ioe) { - throw ioe; - } catch (InterruptedException ie) { - throw new DLInterruptedException("Interrupted on waiting result", ie); - } catch (Exception e) { - throw new IOException("Encountered exception on waiting result", e); - } - } - - /** - * Wait for the result of a lock operation. - * - * @param result result to wait - * @param lockPath path of the lock - * @return the result - * @throws LockingException when encountered exceptions on the result of lock operation - */ - public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException { - try { - return Await.result(result); - } catch (LockingException le) { - throw le; - } catch (Exception e) { - throw new LockingException(lockPath, "Encountered exception on locking ", e); - } - } - - /** - * Convert the <i>throwable</i> to zookeeper related exceptions. - * - * @param throwable cause - * @param path zookeeper path - * @return zookeeper related exceptions - */ - public static Throwable zkException(Throwable throwable, String path) { - if (throwable instanceof KeeperException) { - return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable); - } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) { - return new ZKException("Encountered zookeeper connection loss on " + path, - KeeperException.Code.CONNECTIONLOSS); - } else if (throwable instanceof InterruptedException) { - return new DLInterruptedException("Interrupted on operating " + path, throwable); - } else { - return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable); - } - } - - /** - * Cancel the future. It would interrupt the future. - * - * @param future future to cancel - */ - public static <T> void cancel(Future<T> future) { - future.raise(new FutureCancelledException()); - } - - /** - * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period. - * If the promise has been satisfied before raising, it won't change the state of the promise. - * - * @param promise promise to raise exception - * @param timeout timeout period - * @param unit timeout period unit - * @param cause cause to raise - * @param scheduler scheduler to execute raising exception - * @param key the submit key used by the scheduler - * @return the promise applied with the raise logic - */ - public static <T> Promise<T> within(final Promise<T> promise, - final long timeout, - final TimeUnit unit, - final Throwable cause, - final OrderedScheduler scheduler, - final Object key) { - if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) { - return promise; - } - // schedule a timeout to raise timeout exception - final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() { - @Override - public void run() { - if (!promise.isDefined() && FutureUtils.setException(promise, cause)) { - logger.info("Raise exception", cause); - } - } - }, timeout, unit); - // when the promise is satisfied, cancel the timeout task - promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() { - @Override - public BoxedUnit apply(Try<T> value) { - if (!task.cancel(true)) { - logger.debug("Failed to cancel the timeout task"); - } - return BoxedUnit.UNIT; - } - }); - return promise; - } - - /** - * Satisfy the <i>promise</i> with provide value in an ordered scheduler. - * <p>If the promise was already satisfied, nothing will be changed. - * - * @param promise promise to satisfy - * @param value value to satisfy - * @param scheduler scheduler to satisfy the promise with provided value - * @param key the submit key of the ordered scheduler - */ - public static <T> void setValue(final Promise<T> promise, - final T value, - OrderedScheduler scheduler, - Object key) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - setValue(promise, value); - } - }); - } - - /** - * Satisfy the <i>promise</i> with provide value. - * <p>If the promise was already satisfied, nothing will be changed. - * - * @param promise promise to satisfy - * @param value value to satisfy - * @return true if successfully satisfy the future. false if the promise has been satisfied. - */ - public static <T> boolean setValue(Promise<T> promise, T value) { - boolean success = promise.updateIfEmpty(new Return<T>(value)); - if (!success) { - logger.info("Result set multiple times. Value = '{}', New = 'Return({})'", - promise.poll(), value); - } - return success; - } - - /** - * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler. - * - * @param promise promise to satisfy - * @param throwable cause to satisfy - * @param scheduler the scheduler to satisfy the promise - * @param key submit key of the ordered scheduler - */ - public static <T> void setException(final Promise<T> promise, - final Throwable cause, - OrderedScheduler scheduler, - Object key) { - scheduler.submit(key, new Runnable() { - @Override - public void run() { - setException(promise, cause); - } - }); - } - - /** - * Satisfy the <i>promise</i> with provided <i>cause</i>. - * - * @param promise promise to satisfy - * @param cause cause to satisfy - * @return true if successfully satisfy the future. false if the promise has been satisfied. - */ - public static <T> boolean setException(Promise<T> promise, Throwable cause) { - boolean success = promise.updateIfEmpty(new Throw<T>(cause)); - if (!success) { - logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'", - promise.poll(), cause); - } - return success; - } - - /** - * Ignore exception from the <i>future</i>. - * - * @param future the original future - * @return a transformed future ignores exceptions - */ - public static <T> Promise<Void> ignore(Future<T> future) { - return ignore(future, null); - } - - /** - * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions - * - * @param future the original future - * @param errorMsg the error message to log on exceptions - * @return a transformed future ignores exceptions - */ - public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) { - final Promise<Void> promise = new Promise<Void>(); - future.addEventListener(new FutureEventListener<T>() { - @Override - public void onSuccess(T value) { - setValue(promise, null); - } - - @Override - public void onFailure(Throwable cause) { - if (null != errorMsg) { - logger.error(errorMsg, cause); - } - setValue(promise, null); - } - }); - return promise; - } - - /** - * Create transmit exception from transmit result. - * - * @param transmitResult - * transmit result (basically bk exception code) - * @return transmit exception - */ - public static BKTransmitException transmitException(int transmitResult) { - return new BKTransmitException("Failed to write to bookkeeper; Error is (" - + transmitResult + ") " - + BKException.getMessage(transmitResult), transmitResult); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java deleted file mode 100644 index e06023e..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import com.google.common.base.Stopwatch; - -import com.twitter.util.FuturePool; -import com.twitter.util.FuturePool$; -import com.twitter.util.Future; - -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import scala.runtime.BoxedUnit; -import scala.Function0; - -/** - * {@link FuturePool} with exposed stats. This class is exposing following stats for helping understanding - * the healthy of this thread pool executor. - * <h3>Metrics</h3> - * Stats are only exposed when <code>traceTaskExecution</code> is true. - * <ul> - * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on waiting - * being executed. - * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on executing. - * <li>task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on submitting. - * <li>tasks_pending: gauge. how many tasks are pending in this future pool. - * </ul> - */ -public class MonitoredFuturePool implements FuturePool { - static final Logger LOG = LoggerFactory.getLogger(MonitoredFuturePool.class); - - private final FuturePool futurePool; - - private final StatsLogger statsLogger; - private final OpStatsLogger taskPendingTime; - private final OpStatsLogger taskExecutionTime; - private final OpStatsLogger taskEnqueueTime; - private final Counter taskPendingCounter; - - private final boolean traceTaskExecution; - private final long traceTaskExecutionWarnTimeUs; - - class TimedFunction0<T> extends com.twitter.util.Function0<T> { - private final Function0<T> function0; - private Stopwatch pendingStopwatch = Stopwatch.createStarted(); - - TimedFunction0(Function0<T> function0) { - this.function0 = function0; - this.pendingStopwatch = Stopwatch.createStarted(); - } - - @Override - public T apply() { - taskPendingTime.registerSuccessfulEvent(pendingStopwatch.elapsed(TimeUnit.MICROSECONDS)); - Stopwatch executionStopwatch = Stopwatch.createStarted(); - T result = function0.apply(); - taskExecutionTime.registerSuccessfulEvent(executionStopwatch.elapsed(TimeUnit.MICROSECONDS)); - long elapsed = executionStopwatch.elapsed(TimeUnit.MICROSECONDS); - if (elapsed > traceTaskExecutionWarnTimeUs) { - LOG.info("{} took too long {} microseconds", function0.toString(), elapsed); - } - return result; - } - } - - /** - * Create a future pool with stats exposed. - * - * @param futurePool underlying future pool to execute futures - * @param statsLogger stats logger to receive exposed stats - * @param traceTaskExecution flag to enable/disable exposing stats about task execution - * @param traceTaskExecutionWarnTimeUs flag to enable/disable logging slow tasks - * whose execution time is above this value - */ - public MonitoredFuturePool(FuturePool futurePool, - StatsLogger statsLogger, - boolean traceTaskExecution, - long traceTaskExecutionWarnTimeUs) { - this.futurePool = futurePool; - this.traceTaskExecution = traceTaskExecution; - this.traceTaskExecutionWarnTimeUs = traceTaskExecutionWarnTimeUs; - this.statsLogger = statsLogger; - this.taskPendingTime = statsLogger.getOpStatsLogger("task_pending_time"); - this.taskExecutionTime = statsLogger.getOpStatsLogger("task_execution_time"); - this.taskEnqueueTime = statsLogger.getOpStatsLogger("task_enqueue_time"); - this.taskPendingCounter = statsLogger.getCounter("tasks_pending"); - } - - @Override - public <T> Future<T> apply(Function0<T> function0) { - if (traceTaskExecution) { - taskPendingCounter.inc(); - Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted(); - Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0)); - taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS)); - futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - taskPendingCounter.dec(); - return null; - } - }); - return futureResult; - } else { - return futurePool.apply(function0); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java deleted file mode 100644 index 75223f2..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.util; - -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.MathUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for - * helping understanding the healthy of this thread pool executor. - * <h3>Metrics</h3> - * <ul> - * <li>pending_tasks: gauge. how many tasks are pending in this executor. - * <li>completed_tasks: gauge. how many tasks are completed in this executor. - * <li>total_tasks: gauge. how many tasks are submitted to this executor. - * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on - * waiting being executed. - * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on - * executing. - * </ul> - */ -public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { - static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class); - - private class TimedRunnable implements Runnable { - - final Runnable runnable; - final long enqueueNanos; - - TimedRunnable(Runnable runnable) { - this.runnable = runnable; - this.enqueueNanos = MathUtils.nowInNano(); - } - - @Override - public void run() { - long startNanos = MathUtils.nowInNano(); - long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos); - taskPendingStats.registerSuccessfulEvent(pendingMicros); - try { - runnable.run(); - } finally { - long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos); - taskExecutionStats.registerSuccessfulEvent(executionMicros); - } - } - - @Override - public String toString() { - return runnable.toString(); - } - - @Override - public int hashCode() { - return runnable.hashCode(); - } - } - - private class TimedCallable<T> implements Callable<T> { - - final Callable<T> task; - final long enqueueNanos; - - TimedCallable(Callable<T> task) { - this.task = task; - this.enqueueNanos = MathUtils.nowInNano(); - } - - @Override - public T call() throws Exception { - long startNanos = MathUtils.nowInNano(); - long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos); - taskPendingStats.registerSuccessfulEvent(pendingMicros); - try { - return task.call(); - } finally { - long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos); - taskExecutionStats.registerSuccessfulEvent(executionMicros); - } - } - } - - protected final boolean traceTaskExecution; - protected final OpStatsLogger taskExecutionStats; - protected final OpStatsLogger taskPendingStats; - protected final StatsLogger statsLogger; - // Gauges and their labels - private static final String pendingTasksGaugeLabel = "pending_tasks"; - private final Gauge<Number> pendingTasksGauge; - private static final String completedTasksGaugeLabel = "completed_tasks"; - protected final Gauge<Number> completedTasksGauge; - private static final String totalTasksGaugeLabel = "total_tasks"; - protected final Gauge<Number> totalTasksGauge; - - public MonitoredScheduledThreadPoolExecutor(int corePoolSize, - ThreadFactory threadFactory, - StatsLogger statsLogger, - boolean traceTaskExecution) { - super(corePoolSize, threadFactory); - this.traceTaskExecution = traceTaskExecution; - this.statsLogger = statsLogger; - this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time"); - this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time"); - this.pendingTasksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return getQueue().size(); - } - }; - this.completedTasksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return getCompletedTaskCount(); - } - }; - this.totalTasksGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return getTaskCount(); - } - }; - - // outstanding tasks - this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge); - // completed tasks - this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge); - // total tasks - this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge); - } - - private Runnable timedRunnable(Runnable r) { - return traceTaskExecution ? new TimedRunnable(r) : r; - } - - private <T> Callable<T> timedCallable(Callable<T> task) { - return traceTaskExecution ? new TimedCallable<T>(task) : task; - } - - @Override - public Future<?> submit(Runnable task) { - return super.submit(timedRunnable(task)); - } - - @Override - public <T> Future<T> submit(Runnable task, T result) { - return super.submit(timedRunnable(task), result); - } - - @Override - public <T> Future<T> submit(Callable<T> task) { - return super.submit(timedCallable(task)); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - Throwable hiddenThrowable = extractThrowable(r); - if (hiddenThrowable != null) - logAndHandle(hiddenThrowable, true); - - // The executor re-throws exceptions thrown by the task to the uncaught exception handler - // so we don't need to pass the exception to the handler explicitly - if (null != t) { - logAndHandle(t, false); - } - } - - /** - * The executor re-throws exceptions thrown by the task to the uncaught exception handler - * so we only need to do anything if uncaught exception handler has not been se - */ - private void logAndHandle(Throwable t, boolean passToHandler) { - if (Thread.getDefaultUncaughtExceptionHandler() == null) { - LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t); - } - else { - LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t); - if (passToHandler) { - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); - } - } - } - - - /** - * Extract the exception (throwable) inside the ScheduledFutureTask - * @param runnable - The runable that was executed - * @return exception enclosed in the Runnable if any; null otherwise - */ - private Throwable extractThrowable(Runnable runnable) { - // Check for exceptions wrapped by FutureTask. - // We do this by calling get(), which will cause it to throw any saved exception. - // Check for isDone to prevent blocking - if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) { - try { - ((Future<?>) runnable).get(); - } catch (CancellationException e) { - LOG.debug("Task {} cancelled", runnable, e.getCause()); - } catch (InterruptedException e) { - LOG.debug("Task {} was interrupted", runnable, e); - } catch (ExecutionException e) { - return e.getCause(); - } - } - - return null; - } - - void unregisterGauges() { - this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge); - this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge); - this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge); - } - -}