http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java new file mode 100644 index 0000000..72a5657 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/ConfigurationSubscription.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.config; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.FileNotFoundException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.Iterator; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.FileConfiguration; +import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConfigurationSubscription publishes a reloading, thread-safe view of file configuration. The class + * periodically calls FileConfiguration.reload on the underlying conf, and propagates changes to the + * concurrent config. The configured FileChangedReloadingStrategy ensures that file config will only + * be reloaded if something changed. + * Notes: + * 1. Reload schedule is never terminated. The assumption is a finite number of these are started + * at the calling layer, and terminated only once the executor service is shut down. + * 2. The underlying FileConfiguration is not at all thread-safe, so its important to ensure access + * to this object is always single threaded. + */ +public class ConfigurationSubscription { + static final Logger LOG = LoggerFactory.getLogger(ConfigurationSubscription.class); + + private final ConcurrentBaseConfiguration viewConfig; + private final ScheduledExecutorService executorService; + private final int reloadPeriod; + private final TimeUnit reloadUnit; + private final List<FileConfigurationBuilder> fileConfigBuilders; + private final List<FileConfiguration> fileConfigs; + private final CopyOnWriteArraySet<ConfigurationListener> confListeners; + + public ConfigurationSubscription(ConcurrentBaseConfiguration viewConfig, + List<FileConfigurationBuilder> fileConfigBuilders, + ScheduledExecutorService executorService, + int reloadPeriod, + TimeUnit reloadUnit) + throws ConfigurationException { + checkNotNull(fileConfigBuilders); + checkArgument(!fileConfigBuilders.isEmpty()); + checkNotNull(executorService); + checkNotNull(viewConfig); + this.viewConfig = viewConfig; + this.executorService = executorService; + this.reloadPeriod = reloadPeriod; + this.reloadUnit = reloadUnit; + this.fileConfigBuilders = fileConfigBuilders; + this.fileConfigs = Lists.newArrayListWithExpectedSize(this.fileConfigBuilders.size()); + this.confListeners = new CopyOnWriteArraySet<ConfigurationListener>(); + reload(); + scheduleReload(); + } + + public void registerListener(ConfigurationListener listener) { + this.confListeners.add(listener); + } + + public void unregisterListener(ConfigurationListener listener) { + this.confListeners.remove(listener); + } + + private boolean initConfig() { + if (fileConfigs.isEmpty()) { + try { + for (FileConfigurationBuilder fileConfigBuilder : fileConfigBuilders) { + FileConfiguration fileConfig = fileConfigBuilder.getConfiguration(); + FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy(); + reloadingStrategy.setRefreshDelay(0); + fileConfig.setReloadingStrategy(reloadingStrategy); + fileConfigs.add(fileConfig); + } + } catch (ConfigurationException ex) { + if (!fileNotFound(ex)) { + LOG.error("Config init failed {}", ex); + } + } + } + return !fileConfigs.isEmpty(); + } + + private void scheduleReload() { + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + reload(); + } + }, 0, reloadPeriod, reloadUnit); + } + + @VisibleForTesting + void reload() { + // No-op if already loaded. + if (!initConfig()) { + return; + } + // Reload if config exists. + Set<String> confKeys = Sets.newHashSet(); + for (FileConfiguration fileConfig : fileConfigs) { + LOG.debug("Check and reload config, file={}, lastModified={}", fileConfig.getFile(), + fileConfig.getFile().lastModified()); + fileConfig.reload(); + // load keys + Iterator keyIter = fileConfig.getKeys(); + while (keyIter.hasNext()) { + String key = (String) keyIter.next(); + confKeys.add(key); + } + } + // clear unexisted keys + Iterator viewIter = viewConfig.getKeys(); + while (viewIter.hasNext()) { + String key = (String) viewIter.next(); + if (!confKeys.contains(key)) { + clearViewProperty(key); + } + } + LOG.info("Reload features : {}", confKeys); + // load keys from files + for (FileConfiguration fileConfig : fileConfigs) { + try { + loadView(fileConfig); + } catch (Exception ex) { + if (!fileNotFound(ex)) { + LOG.error("Config reload failed for file {}", fileConfig.getFileName(), ex); + } + } + } + for (ConfigurationListener listener : confListeners) { + listener.onReload(viewConfig); + } + } + + private boolean fileNotFound(Exception ex) { + return ex instanceof FileNotFoundException + || ex.getCause() != null && ex.getCause() instanceof FileNotFoundException; + } + + private void loadView(FileConfiguration fileConfig) { + Iterator fileIter = fileConfig.getKeys(); + while (fileIter.hasNext()) { + String key = (String) fileIter.next(); + setViewProperty(fileConfig, key, fileConfig.getProperty(key)); + } + } + + private void clearViewProperty(String key) { + LOG.debug("Removing property, key={}", key); + viewConfig.clearProperty(key); + } + + private void setViewProperty(FileConfiguration fileConfig, + String key, + Object value) { + if (!viewConfig.containsKey(key) || !viewConfig.getProperty(key).equals(value)) { + LOG.debug("Setting property, key={} value={}", key, fileConfig.getProperty(key)); + viewConfig.setProperty(key, fileConfig.getProperty(key)); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java new file mode 100644 index 0000000..0ff967d --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/FileConfigurationBuilder.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.config; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.FileConfiguration; + +/** + * Abstract out FileConfiguration subclass construction. + */ +public interface FileConfigurationBuilder { + FileConfiguration getConfiguration() throws ConfigurationException; +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java new file mode 100644 index 0000000..2d07535 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/PropertiesConfigurationBuilder.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.config; + +import java.net.URL; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.FileConfiguration; +import org.apache.commons.configuration.PropertiesConfiguration; + +/** + * Hide PropertiesConfiguration dependency. + */ +public class PropertiesConfigurationBuilder implements FileConfigurationBuilder { + private URL url; + + public PropertiesConfigurationBuilder(URL url) { + this.url = url; + } + + @Override + public FileConfiguration getConfiguration() throws ConfigurationException { + return new PropertiesConfiguration(url); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java new file mode 100644 index 0000000..88e68f2 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/config/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Configuration Related Utils. + */ +package org.apache.distributedlog.common.config; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java new file mode 100644 index 0000000..8d5069e --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/VoidFunctions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.functions; + +import java.util.List; +import java.util.function.Function; + +/** + * Functions for transforming structures related to {@link Void}. + */ +public class VoidFunctions { + + public static final Function<List<Void>, Void> LIST_TO_VOID_FUNC = + list -> null; + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java new file mode 100644 index 0000000..9e88612 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/functions/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Common Functions. + */ +package org.apache.distributedlog.common.functions; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java new file mode 100644 index 0000000..4c90bd2 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Common functions and utils used across the project. + */ +package org.apache.distributedlog.common; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java new file mode 100644 index 0000000..f3e8c33 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRate.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.rate; + +/** + * Moving Average Rate. + */ +public interface MovingAverageRate { + double get(); + void add(long amount); + void inc(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java new file mode 100644 index 0000000..790ba03 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/MovingAverageRateFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.rate; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Factory to create {@link MovingAverageRate} instances. + */ +public class MovingAverageRateFactory implements Runnable { + + private static final int DEFAULT_INTERVAL_SECS = 1; + + private final ScheduledExecutorService scheduler; + private final ScheduledFuture<?> scheduledFuture; + private final CopyOnWriteArrayList<SampledMovingAverageRate> avgs; + + public MovingAverageRateFactory(ScheduledExecutorService scheduler) { + this.avgs = new CopyOnWriteArrayList<SampledMovingAverageRate>(); + this.scheduler = scheduler; + this.scheduledFuture = this.scheduler.scheduleAtFixedRate( + this, DEFAULT_INTERVAL_SECS, DEFAULT_INTERVAL_SECS, TimeUnit.SECONDS); + } + + public MovingAverageRate create(int intervalSecs) { + SampledMovingAverageRate avg = new SampledMovingAverageRate(intervalSecs); + avgs.add(avg); + return avg; + } + + public void close() { + scheduledFuture.cancel(true); + avgs.clear(); + } + + @Override + public void run() { + sampleAll(); + } + + private void sampleAll() { + avgs.forEach(SampledMovingAverageRate::sample); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java new file mode 100644 index 0000000..2c89d64 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/SampledMovingAverageRate.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.rate; + +import com.google.common.base.Ticker; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.tuple.Pair; + +/** + * Sampled {@link MovingAverageRate}. + */ +class SampledMovingAverageRate implements MovingAverageRate { + + private static final long NANOS_PER_SEC = TimeUnit.SECONDS.toNanos(1); + + private final AtomicLong total; + private final Ticker ticker; + private final double scaleFactor; + private final LinkedBlockingDeque<Pair<Long, Long>> samples; + + private double value; + + public SampledMovingAverageRate(int intervalSecs) { + this(intervalSecs, 1, Ticker.systemTicker()); + } + + SampledMovingAverageRate(int intervalSecs, + double scaleFactor, + Ticker ticker) { + this.value = 0; + this.total = new AtomicLong(0); + this.scaleFactor = scaleFactor; + this.ticker = ticker; + this.samples = new LinkedBlockingDeque<>(intervalSecs); + } + + @Override + public double get() { + return value; + } + + @Override + public void add(long amount) { + total.getAndAdd(amount); + } + + @Override + public void inc() { + add(1); + } + + void sample() { + value = doSample(); + } + + private double doSample() { + long newSample = total.get(); + long newTimestamp = ticker.read(); + + double rate = 0; + if (!samples.isEmpty()) { + Pair<Long, Long> oldestSample = samples.peekLast(); + + double dy = newSample - oldestSample.getRight(); + double dt = newTimestamp - oldestSample.getLeft(); + + rate = (dt == 0) ? 0 : (NANOS_PER_SEC * scaleFactor * dy) / dt; + } + + if (samples.remainingCapacity() == 0) { + samples.removeLast(); + } else { + samples.addFirst(Pair.of(newTimestamp, newSample)); + } + + return rate; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java new file mode 100644 index 0000000..3117c64 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/rate/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Rate relate functions. + */ +package org.apache.distributedlog.common.rate; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java new file mode 100644 index 0000000..61a20f1 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.stats; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.bookkeeper.stats.CachingStatsLogger; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsData; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Stats Loggers that broadcast stats to multiple {@link StatsLogger}. + */ +public class BroadCastStatsLogger { + + /** + * Create a broadcast stats logger of two stats loggers `<code>first</code>` and + * `<code>second</code>`. The returned stats logger doesn't allow registering any + * {@link Gauge}. + * + * @param first + * first stats logger + * @param second + * second stats logger + * @return broadcast stats logger + */ + public static StatsLogger two(StatsLogger first, StatsLogger second) { + return new CachingStatsLogger(new Two(first, second)); + } + + static class Two implements StatsLogger { + protected final StatsLogger first; + protected final StatsLogger second; + + private Two(StatsLogger first, StatsLogger second) { + super(); + checkNotNull(first); + checkNotNull(second); + this.first = first; + this.second = second; + } + + @Override + public OpStatsLogger getOpStatsLogger(final String statName) { + final OpStatsLogger firstLogger = first.getOpStatsLogger(statName); + final OpStatsLogger secondLogger = second.getOpStatsLogger(statName); + return new OpStatsLogger() { + @Override + public void registerFailedEvent(long l) { + firstLogger.registerFailedEvent(l); + secondLogger.registerFailedEvent(l); + } + + @Override + public void registerSuccessfulEvent(long l) { + firstLogger.registerSuccessfulEvent(l); + secondLogger.registerSuccessfulEvent(l); + } + + @Override + public OpStatsData toOpStatsData() { + // Eventually consistent. + return firstLogger.toOpStatsData(); + } + + @Override + public void clear() { + firstLogger.clear(); + secondLogger.clear(); + } + }; + } + + @Override + public Counter getCounter(final String statName) { + final Counter firstCounter = first.getCounter(statName); + final Counter secondCounter = second.getCounter(statName); + return new Counter() { + @Override + public void clear() { + firstCounter.clear(); + secondCounter.clear(); + } + + @Override + public void inc() { + firstCounter.inc(); + secondCounter.inc(); + } + + @Override + public void dec() { + firstCounter.dec(); + secondCounter.dec(); + } + + @Override + public void add(long l) { + firstCounter.add(l); + secondCounter.add(l); + } + + @Override + public Long get() { + // Eventually consistent. + return firstCounter.get(); + } + }; + } + + @Override + public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) { + // Different underlying stats loggers have different semantics wrt. gauge registration. + throw new RuntimeException("Cannot register a gauge on BroadCastStatsLogger.Two"); + } + + @Override + public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { + // no-op + } + + @Override + public StatsLogger scope(final String scope) { + return new Two(first.scope(scope), second.scope(scope)); + } + + @Override + public void removeScope(String scope, StatsLogger statsLogger) { + if (!(statsLogger instanceof Two)) { + return; + } + + Two another = (Two) statsLogger; + + first.removeScope(scope, another.first); + second.removeScope(scope, another.second); + } + } + + /** + * Create a broadcast stats logger of two stats loggers <code>master</code> and <code>slave</code>. + * It is similar as {@link #two(StatsLogger, StatsLogger)}, but it allows registering {@link Gauge}s. + * The {@link Gauge} will be registered under master. + * + * @param master + * master stats logger to receive {@link Counter}, {@link OpStatsLogger} and {@link Gauge}. + * @param slave + * slave stats logger to receive only {@link Counter} and {@link OpStatsLogger}. + * @return broadcast stats logger + */ + public static StatsLogger masterslave(StatsLogger master, StatsLogger slave) { + return new CachingStatsLogger(new MasterSlave(master, slave)); + } + + static class MasterSlave extends Two { + + private MasterSlave(StatsLogger master, StatsLogger slave) { + super(master, slave); + } + + @Override + public <T extends Number> void registerGauge(String statName, Gauge<T> gauge) { + first.registerGauge(statName, gauge); + } + + @Override + public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) { + first.unregisterGauge(statName, gauge); + } + + @Override + public StatsLogger scope(String scope) { + return new MasterSlave(first.scope(scope), second.scope(scope)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java new file mode 100644 index 0000000..e71a799 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.stats; + +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.distributedlog.common.concurrent.FutureEventListener; + +/** + * A {@link FutureEventListener} monitors the stats for a given operation. + */ +public class OpStatsListener<T> implements FutureEventListener<T> { + OpStatsLogger opStatsLogger; + Stopwatch stopwatch; + + public OpStatsListener(OpStatsLogger opStatsLogger, Stopwatch stopwatch) { + this.opStatsLogger = opStatsLogger; + if (null == stopwatch) { + this.stopwatch = Stopwatch.createStarted(); + } else { + this.stopwatch = stopwatch; + } + } + + public OpStatsListener(OpStatsLogger opStatsLogger) { + this(opStatsLogger, null); + } + + @Override + public void onSuccess(T value) { + opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } + + @Override + public void onFailure(Throwable cause) { + opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java new file mode 100644 index 0000000..bf3859d --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Stats Related Utils. + */ +package org.apache.distributedlog.common.stats; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java new file mode 100644 index 0000000..53f4ab2 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/BitMaskUtils.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Utils for bit mask operations. + */ +public class BitMaskUtils { + + /** + * 1) Unset all bits where value in mask is set. + * 2) Set these bits to value specified by newValue. + * + * <p>e.g. + * if oldValue = 1010, mask = 0011, newValue = 0001 + * 1) 1010 -> 1000 + * 2) 1000 -> 1001 + * + * @param oldValue expected old value + * @param mask the mask of the value for updates + * @param newValue new value to set + * @return updated value + */ + public static long set(long oldValue, long mask, long newValue) { + checkArgument(oldValue >= 0L && mask >= 0L && newValue >= 0L); + return ((oldValue & (~mask)) | (newValue & mask)); + } + + /** + * Get the bits where mask is 1. + * + * @param value value + * @param mask mask of the value + * @return the bit of the mask + */ + public static long get(long value, long mask) { + checkArgument(value >= 0L && mask >= 0L); + return (value & mask); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java new file mode 100644 index 0000000..38b3ed2 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/MathUtil.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.distributedlog.common.util; + +/** + * Helpers for math related utils. + */ +public class MathUtil { + + public static int signSafeMod(long dividend, int divisor) { + int mod = (int) (dividend % divisor); + + if (mod < 0) { + mod += divisor; + } + + return mod; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java new file mode 100644 index 0000000..24cb63d --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Permit.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +/** + * Permit. + */ +@FunctionalInterface +public interface Permit { + + void release(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java new file mode 100644 index 0000000..8fcbf12 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitLimiter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +/** + * A simple limiter interface which tracks acquire/release of permits, for + * example for tracking outstanding writes. + */ +public interface PermitLimiter { + + PermitLimiter NULL_PERMIT_LIMITER = new PermitLimiter() { + @Override + public boolean acquire() { + return true; + } + @Override + public void release(int permits) { + } + + @Override + public void close() { + + } + }; + + /** + * Acquire a permit. + * + * @return true if successfully acquire a permit, otherwise false. + */ + boolean acquire(); + + /** + * Release a permit. + */ + void release(int permits); + + /** + * Close the resources created by the limiter. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java new file mode 100644 index 0000000..3b6e3a1 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/PermitManager.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +/** + * Permit manager for managing permits. + */ +public interface PermitManager { + + /** + * A class present a permit managed by a permit manager. + */ + interface Permit { + Permit ALLOWED = () -> true; + boolean isAllowed(); + } + + PermitManager UNLIMITED_PERMIT_MANAGER = new PermitManager() { + @Override + public Permit acquirePermit() { + return Permit.ALLOWED; + } + + @Override + public void releasePermit(Permit permit) { + // nop + } + + @Override + public boolean allowObtainPermits() { + return true; + } + + @Override + public boolean disallowObtainPermits(Permit permit) { + return false; + } + + @Override + public void close() { + // nop + } + + }; + + /** + * Obetain a permit from permit manager. + * + * @return permit. + */ + Permit acquirePermit(); + + /** + * Release a given permit. + * + * @param permit + * permit to release + */ + void releasePermit(Permit permit); + + /** + * Allow obtaining permits. + */ + boolean allowObtainPermits(); + + /** + * Disallow obtaining permits. Disallow needs to be performed under the context + * of <i>permit</i>. + * + * @param permit + * permit context to disallow + */ + boolean disallowObtainPermits(Permit permit); + + /** + * Release the resources. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java new file mode 100644 index 0000000..f6d4f23 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/SchedulerUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +/** + * Scheduler related utils. + */ +@Slf4j +public class SchedulerUtils { + + public static void shutdownScheduler(ExecutorService service, long timeout, TimeUnit timeUnit) { + if (null == service) { + return; + } + service.shutdown(); + try { + service.awaitTermination(timeout, timeUnit); + } catch (InterruptedException e) { + log.warn("Interrupted when shutting down scheduler : ", e); + } + service.shutdownNow(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java new file mode 100644 index 0000000..a40b8e2 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sequencer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +/** + * Sequencer generating transaction id. + */ +public interface Sequencer { + + /** + * Return next transaction id generated by the sequencer. + * + * @return next transaction id generated by the sequencer. + */ + long nextId(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java new file mode 100644 index 0000000..d418e0f --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/Sizable.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.common.util; + +/** + * The {@code Sizable} interface is to provide the capability of calculating size + * of any objects. + */ +public interface Sizable { + /** + * Calculate the size for this instance. + * + * @return size of the instance. + */ + long size(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java new file mode 100644 index 0000000..e2bde37 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Common utility functions. + */ +package org.apache.distributedlog.common.util; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java new file mode 100644 index 0000000..4edc09d --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.io; + +import java.io.IOException; + +/** + * An {@code Abortable} is a source or destination of data that can be aborted. + * The abort method is invoked to release resources that the object is holding + * (such as open files). The abort happens when the object is in an error state, + * which it couldn't be closed gracefully. + * + * @see java.io.Closeable + * @since 0.3.32 + */ +public interface Abortable { + + /** + * Aborts the object and releases any resources associated with it. + * If the object is already aborted then invoking this method has no + * effect. + * + * @throws IOException if an I/O error occurs. + */ + void abort() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java new file mode 100644 index 0000000..b6101a8 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/Abortables.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.io; + +import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.apache.distributedlog.common.functions.VoidFunctions; +import org.apache.distributedlog.common.concurrent.FutureUtils; + +/** + * Utility methods for working with {@link Abortable} objects. + * + * @since 0.3.32 + */ +@Slf4j +public final class Abortables { + + private Abortables() {} + + public static CompletableFuture<Void> asyncAbort(@Nullable AsyncAbortable abortable, + boolean swallowIOException) { + if (null == abortable) { + return FutureUtils.Void(); + } else if (swallowIOException) { + return FutureUtils.ignore(abortable.asyncAbort()); + } else { + return abortable.asyncAbort(); + } + } + + /** + * Aborts a {@link Abortable}, with control over whether an {@link IOException} may be thrown. + * This is primarily useful in a finally block, where a thrown exception needs to be logged but + * not propagated (otherwise the original exception will be lost). + * + * <p>If {@code swallowIOException} is true then we never throw {@code IOException} but merely log it. + * + * <p>Example: <pre> {@code + * + * public void abortStreamNicely() throws IOException { + * SomeStream stream = new SomeStream("foo"); + * try { + * // ... code which does something with the stream ... + * } catch (IOException ioe) { + * // If an exception occurs, we might abort the stream. + * Abortables.abort(stream, true); + * } + * }}</pre> + * + * @param abortable the {@code Abortable} object to be aborted, or null, in which case this method + * does nothing. + * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods + * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} + */ + public static void abort(@Nullable Abortable abortable, + boolean swallowIOException) + throws IOException { + if (null == abortable) { + return; + } + try { + abortable.abort(); + } catch (IOException ioe) { + if (swallowIOException) { + log.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe); + } else { + throw ioe; + } + } + } + + /** + * Abort async <i>abortable</i>. + * + * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method + * does nothing. + * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods + * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException} + * @see #abort(Abortable, boolean) + */ + public static void abort(@Nullable AsyncAbortable abortable, + boolean swallowIOException) + throws IOException { + if (null == abortable) { + return; + } + try { + FutureUtils.result(abortable.asyncAbort()); + } catch (Exception e) { + if (swallowIOException) { + log.warn("IOException thrown while aborting Abortable {} : ", abortable, e); + } else { + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } + } + } + } + + /** + * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than + * propagating it. + * + * <p>While it's not safe in the general case to ignore exceptions that are thrown when aborting an + * I/O resource, it should generally be safe in the case of a resource that's being used only for + * reading. + * + * @param abortable the {@code Abortable} to be closed, or {@code null} in which case this method + * does nothing. + */ + public static void abortQuietly(@Nullable Abortable abortable) { + try { + abort(abortable, true); + } catch (IOException e) { + log.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e); + } + } + + /** + * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than + * propagating it. + * + * <p>While it's not safe in the general case to ignore exceptions that are thrown when aborting an + * I/O resource, it should generally be safe in the case of a resource that's being used only for + * reading. + * + * @param abortable the {@code AsyncAbortable} to be closed, or {@code null} in which case this method + * does nothing. + */ + public static void abortQuietly(@Nullable AsyncAbortable abortable) { + try { + abort(abortable, true); + } catch (IOException e) { + log.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e); + } + } + + /** + * Abort the abortables in sequence. + * + * @param executorService + * executor service to execute + * @param abortables + * abortables to abort + * @return future represents the abort future + */ + public static CompletableFuture<Void> abortSequence(ExecutorService executorService, + AsyncAbortable... abortables) { + List<AsyncAbortable> abortableList = Lists.newArrayListWithExpectedSize(abortables.length); + for (AsyncAbortable abortable : abortables) { + if (null == abortable) { + abortableList.add(AsyncAbortable.NULL); + } else { + abortableList.add(abortable); + } + } + return FutureUtils.processList( + abortableList, + AsyncAbortable.ABORT_FUNC, + executorService + ).thenApply(VoidFunctions.LIST_TO_VOID_FUNC); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java new file mode 100644 index 0000000..7636c57 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.io; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.distributedlog.common.concurrent.FutureUtils; + +/** + * An {@code Abortable} is a source or destination of data that can be aborted. + * The abort method is invoked to release resources that the object is holding + * (such as open files). The abort happens when the object is in an error state, + * which it couldn't be closed gracefully. + * + * @see AsyncCloseable + * @see Abortable + * @since 0.3.43 + */ +public interface AsyncAbortable { + + Function<AsyncAbortable, CompletableFuture<Void>> ABORT_FUNC = abortable -> abortable.asyncAbort(); + + AsyncAbortable NULL = () -> FutureUtils.Void(); + + /** + * Aborts the object and releases any resources associated with it. + * If the object is already aborted then invoking this method has no + * effect. + * + * @return future represents the abort result + */ + CompletableFuture<Void> asyncAbort(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java new file mode 100644 index 0000000..851f426 --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.io; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.distributedlog.common.concurrent.FutureUtils; + +/** + * A {@code AsyncCloseable} is a source or destination of data that can be closed asynchronously. + * The close method is invoked to release resources that the object is + * holding (such as open files). + */ +public interface AsyncCloseable { + + Function<AsyncCloseable, CompletableFuture<Void>> CLOSE_FUNC = closeable -> closeable.asyncClose(); + + Function<AsyncCloseable, CompletableFuture<Void>> CLOSE_FUNC_IGNORE_ERRORS = + closeable -> FutureUtils.ignore(closeable.asyncClose()); + + AsyncCloseable NULL = () -> FutureUtils.Void(); + + /** + * Closes this source and releases any system resources associated + * with it. If the source is already closed then invoking this + * method has no effect. + * + * @return future representing the close result. + */ + CompletableFuture<Void> asyncClose(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java new file mode 100644 index 0000000..f7c3e3b --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.io; + +import java.util.concurrent.CompletableFuture; + +/** + * A {@code AsyncDeleteable} is a source or destination of data that can be deleted asynchronously. + * This delete method is invoked to delete the source. + */ +public interface AsyncDeleteable { + /** + * Releases any system resources associated with this and delete the source. If the source is + * already deleted then invoking this method has no effect. + * + * @return future representing the deletion result. + */ + CompletableFuture<Void> delete(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java new file mode 100644 index 0000000..c8e957f --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/io/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * IO Utils for distributedlog. + */ +package org.apache.distributedlog.io; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java ---------------------------------------------------------------------- diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java b/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java new file mode 100644 index 0000000..89b448e --- /dev/null +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/util/OrderedScheduler.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.util; + +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.distributedlog.common.concurrent.FutureUtils; +import org.apache.distributedlog.common.util.MathUtil; + +/** + * Ordered Scheduler. It is thread pool based {@link ScheduledExecutorService}, additionally providing + * the ability to execute/schedule tasks by <code>key</code>. Hence the tasks submitted by same <i>key</i> + * will be executed in order. + * + * <p>The scheduler is comprised of multiple {@link ScheduledExecutorService}s. Each + * {@link ScheduledExecutorService} is a single thread executor. Normal task submissions will + * be submitted to executors in a random manner to guarantee load balancing. Keyed task submissions (e.g + * {@link OrderedScheduler#submit(Object, Runnable)} will be submitted to a dedicated executor based on + * the hash value of submit <i>key</i>. + */ +public class OrderedScheduler implements ScheduledExecutorService { + + /** + * Create a builder to build scheduler. + * + * @return scheduler builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for {@link OrderedScheduler}. + */ + public static class Builder { + + private String name = "OrderedScheduler"; + private int corePoolSize = -1; + private ThreadFactory threadFactory = null; + + /** + * Set the name of this scheduler. It would be used as part of stats scope and thread name. + * + * @param name name of the scheduler. + * @return scheduler builder + */ + public Builder name(String name) { + this.name = name; + return this; + } + + /** + * Set the number of threads to be used in this scheduler. + * + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle + * @return scheduler builder + */ + public Builder corePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + return this; + } + + /** + * Set the thread factory that the scheduler uses to create a new thread. + * + * @param threadFactory the factory to use when the executor + * creates a new thread + * @return scheduler builder + */ + public Builder threadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + + /** + * Build the ordered scheduler. + * + * @return ordered scheduler + */ + public OrderedScheduler build() { + if (corePoolSize <= 0) { + corePoolSize = Runtime.getRuntime().availableProcessors(); + } + if (null == threadFactory) { + threadFactory = Executors.defaultThreadFactory(); + } + + return new OrderedScheduler( + name, + corePoolSize, + threadFactory); + } + + } + + protected final String name; + protected final int corePoolSize; + protected final ScheduledExecutorService[] executors; + protected final Random random; + + private OrderedScheduler(String name, + int corePoolSize, + ThreadFactory threadFactory) { + this.name = name; + this.corePoolSize = corePoolSize; + this.executors = new ScheduledExecutorService[corePoolSize]; + for (int i = 0; i < corePoolSize; i++) { + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat(name + "-scheduler-" + i + "-%d") + .setThreadFactory(threadFactory) + .build(); + executors[i] = Executors.newSingleThreadScheduledExecutor(tf); + } + this.random = new Random(System.currentTimeMillis()); + } + + protected ScheduledExecutorService chooseExecutor() { + return corePoolSize == 1 ? executors[0] : executors[random.nextInt(corePoolSize)]; + } + + public ScheduledExecutorService chooseExecutor(Object key) { + if (null == key) { + return chooseExecutor(); + } + return corePoolSize == 1 ? executors[0] : + executors[MathUtil.signSafeMod(Objects.hashCode(key), corePoolSize)]; + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return chooseExecutor().schedule(command, delay, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return chooseExecutor().schedule(callable, delay, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, + long initialDelay, long period, TimeUnit unit) { + return chooseExecutor().scheduleAtFixedRate(command, initialDelay, period, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, + long initialDelay, long delay, TimeUnit unit) { + return chooseExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + for (ScheduledExecutorService executor : executors) { + executor.shutdown(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<Runnable> shutdownNow() { + List<Runnable> runnables = new ArrayList<Runnable>(); + for (ScheduledExecutorService executor : executors) { + runnables.addAll(executor.shutdownNow()); + } + return runnables; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isShutdown() { + for (ScheduledExecutorService executor : executors) { + if (!executor.isShutdown()) { + return false; + } + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminated() { + for (ScheduledExecutorService executor : executors) { + if (!executor.isTerminated()) { + return false; + } + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + for (ScheduledExecutorService executor : executors) { + if (!executor.awaitTermination(timeout, unit)) { + return false; + } + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public <T> Future<T> submit(Callable<T> task) { + return chooseExecutor().submit(task); + } + + /** + * {@inheritDoc} + */ + @Override + public <T> Future<T> submit(Runnable task, T result) { + return chooseExecutor().submit(task, result); + } + + /** + * {@inheritDoc} + */ + @Override + public Future<?> submit(Runnable task) { + return chooseExecutor().submit(task); + } + + /** + * {@inheritDoc} + */ + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return chooseExecutor().invokeAll(tasks); + } + + /** + * {@inheritDoc} + */ + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return chooseExecutor().invokeAll(tasks, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return chooseExecutor().invokeAny(tasks); + } + + /** + * {@inheritDoc} + */ + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return chooseExecutor().invokeAny(tasks, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public void execute(Runnable command) { + chooseExecutor().execute(command); + } + + // Ordered Functions + + public ScheduledFuture<?> schedule(Object key, Runnable command, long delay, TimeUnit unit) { + return chooseExecutor(key).schedule(command, delay, unit); + } + + public ScheduledFuture<?> scheduleAtFixedRate(Object key, + Runnable command, + long initialDelay, + long period, + TimeUnit unit) { + return chooseExecutor(key).scheduleAtFixedRate(command, initialDelay, period, unit); + } + + public Future<?> submit(Object key, Runnable command) { + return chooseExecutor(key).submit(command); + } + + public <T> CompletableFuture<T> submit(Object key, Callable<T> callable) { + CompletableFuture<T> future = FutureUtils.createFuture(); + chooseExecutor(key).submit(() -> { + try { + future.complete(callable.call()); + } catch (Exception e) { + future.completeExceptionally(e); + } + }); + return future; + } + +}