http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/package-info.java deleted file mode 100644 index 697d2e9..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/bk/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * BookKeeper related util functions. - * <p> - * <h2>Ledger Allocator</h2> - * - */ -package com.twitter.distributedlog.bk;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java deleted file mode 100644 index 2196245..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.callback; - -import com.twitter.distributedlog.LogSegmentMetadata; - -import java.util.List; - -/** - * Listener on log segments changes for a given stream used by {@link com.twitter.distributedlog.BKLogReadHandler} - */ -public interface LogSegmentListener { - - /** - * Notified when <i>segments</i> updated. The new sorted log segments - * list is returned in this method. - * - * @param segments - * updated list of segments. - */ - void onSegmentsUpdated(List<LogSegmentMetadata> segments); - - /** - * Notified when the log stream is deleted. - */ - void onLogStreamDeleted(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java deleted file mode 100644 index e38f305..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/LogSegmentNamesListener.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.callback; - -import org.apache.bookkeeper.versioning.Versioned; - -import java.util.List; - -/** - * Listener on list of log segments changes for a given stream used by - * {@link com.twitter.distributedlog.logsegment.LogSegmentMetadataStore}. - */ -public interface LogSegmentNamesListener { - /** - * Notified when <i>segments</i> updated. The new log segments - * list is returned in this method. - * - * @param segments - * updated list of segments. - */ - void onSegmentsUpdated(Versioned<List<String>> segments); - - /** - * Notified when the log stream is deleted. - */ - void onLogStreamDeleted(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/NamespaceListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/NamespaceListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/NamespaceListener.java deleted file mode 100644 index fc63ff5..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/NamespaceListener.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.callback; - -import com.google.common.annotations.Beta; - -import java.util.Iterator; - -@Beta -public interface NamespaceListener { - - /** - * Updated with latest streams. - * - * @param streams - * latest list of streams under a given namespace. - */ - void onStreamsChanged(Iterator<String> streams); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/ReadAheadCallback.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/ReadAheadCallback.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/ReadAheadCallback.java deleted file mode 100644 index 7c46a1a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/ReadAheadCallback.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.callback; - -/** - * ReadAhead Callback - */ -public interface ReadAheadCallback { - void resumeReadAhead(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/package-info.java deleted file mode 100644 index 2724d43..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/callback/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Callbacks for distributedlog operations. - */ -package com.twitter.distributedlog.callback; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentBaseConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentBaseConfiguration.java deleted file mode 100644 index 91603c1..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentBaseConfiguration.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import com.google.common.base.Preconditions; - -import org.apache.commons.configuration.AbstractConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Configuration view built on concurrent hash map for fast thread-safe access. - * Notes: - * 1. Multi-property list aggregation will not work in this class. I.e. commons config - * normally combines all properties with the same key into one list property automatically. - * This class simply overwrites any existing mapping. - */ -public class ConcurrentBaseConfiguration extends AbstractConfiguration { - static final Logger LOG = LoggerFactory.getLogger(ConcurrentBaseConfiguration.class); - - private final ConcurrentHashMap<String, Object> map; - - public ConcurrentBaseConfiguration() { - this.map = new ConcurrentHashMap<String, Object>(); - } - - @Override - protected void addPropertyDirect(String key, Object value) { - Preconditions.checkNotNull(value); - map.put(key, value); - } - - @Override - public Object getProperty(String key) { - return map.get(key); - } - - @Override - public Iterator getKeys() { - return map.keySet().iterator(); - } - - @Override - public boolean containsKey(String key) { - return map.containsKey(key); - } - - @Override - public boolean isEmpty() { - return map.isEmpty(); - } - - @Override - protected void clearPropertyDirect(String key) { - map.remove(key); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentConstConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentConstConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentConstConfiguration.java deleted file mode 100644 index a044a13..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConcurrentConstConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import com.google.common.base.Preconditions; -import org.apache.commons.configuration.Configuration; - -/** - * Invariant thread-safe view of some configuration. - */ -public class ConcurrentConstConfiguration extends ConcurrentBaseConfiguration { - public ConcurrentConstConfiguration(Configuration conf) { - Preconditions.checkNotNull(conf); - copy(conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationListener.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationListener.java deleted file mode 100644 index d4c44b7..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -/** - * Configuration listener triggered when reloading configuration settings. - */ -public interface ConfigurationListener { - - /** - * Reload the configuration. - * - * @param conf configuration to reload - */ - void onReload(ConcurrentBaseConfiguration conf); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationSubscription.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationSubscription.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationSubscription.java deleted file mode 100644 index dadfe81..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/ConfigurationSubscription.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import java.io.FileNotFoundException; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.Iterator; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.FileConfiguration; -import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ConfigurationSubscription publishes a reloading, thread-safe view of file configuration. The class - * periodically calls FileConfiguration.reload on the underlying conf, and propagates changes to the - * concurrent config. The configured FileChangedReloadingStrategy ensures that file config will only - * be reloaded if something changed. - * Notes: - * 1. Reload schedule is never terminated. The assumption is a finite number of these are started - * at the calling layer, and terminated only once the executor service is shut down. - * 2. The underlying FileConfiguration is not at all thread-safe, so its important to ensure access - * to this object is always single threaded. - */ -public class ConfigurationSubscription { - static final Logger LOG = LoggerFactory.getLogger(ConfigurationSubscription.class); - - private final ConcurrentBaseConfiguration viewConfig; - private final ScheduledExecutorService executorService; - private final int reloadPeriod; - private final TimeUnit reloadUnit; - private final List<FileConfigurationBuilder> fileConfigBuilders; - private final List<FileConfiguration> fileConfigs; - private final CopyOnWriteArraySet<ConfigurationListener> confListeners; - - public ConfigurationSubscription(ConcurrentBaseConfiguration viewConfig, - List<FileConfigurationBuilder> fileConfigBuilders, - ScheduledExecutorService executorService, - int reloadPeriod, - TimeUnit reloadUnit) - throws ConfigurationException { - Preconditions.checkNotNull(fileConfigBuilders); - Preconditions.checkArgument(!fileConfigBuilders.isEmpty()); - Preconditions.checkNotNull(executorService); - Preconditions.checkNotNull(viewConfig); - this.viewConfig = viewConfig; - this.executorService = executorService; - this.reloadPeriod = reloadPeriod; - this.reloadUnit = reloadUnit; - this.fileConfigBuilders = fileConfigBuilders; - this.fileConfigs = Lists.newArrayListWithExpectedSize(this.fileConfigBuilders.size()); - this.confListeners = new CopyOnWriteArraySet<ConfigurationListener>(); - reload(); - scheduleReload(); - } - - public void registerListener(ConfigurationListener listener) { - this.confListeners.add(listener); - } - - public void unregisterListener(ConfigurationListener listener) { - this.confListeners.remove(listener); - } - - private boolean initConfig() { - if (fileConfigs.isEmpty()) { - try { - for (FileConfigurationBuilder fileConfigBuilder : fileConfigBuilders) { - FileConfiguration fileConfig = fileConfigBuilder.getConfiguration(); - FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy(); - reloadingStrategy.setRefreshDelay(0); - fileConfig.setReloadingStrategy(reloadingStrategy); - fileConfigs.add(fileConfig); - } - } catch (ConfigurationException ex) { - if (!fileNotFound(ex)) { - LOG.error("Config init failed {}", ex); - } - } - } - return !fileConfigs.isEmpty(); - } - - private void scheduleReload() { - executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - reload(); - } - }, 0, reloadPeriod, reloadUnit); - } - - @VisibleForTesting - void reload() { - // No-op if already loaded. - if (!initConfig()) { - return; - } - // Reload if config exists. - Set<String> confKeys = Sets.newHashSet(); - for (FileConfiguration fileConfig : fileConfigs) { - LOG.debug("Check and reload config, file={}, lastModified={}", fileConfig.getFile(), - fileConfig.getFile().lastModified()); - fileConfig.reload(); - // load keys - Iterator keyIter = fileConfig.getKeys(); - while (keyIter.hasNext()) { - String key = (String) keyIter.next(); - confKeys.add(key); - } - } - // clear unexisted keys - Iterator viewIter = viewConfig.getKeys(); - while (viewIter.hasNext()) { - String key = (String) viewIter.next(); - if (!confKeys.contains(key)) { - clearViewProperty(key); - } - } - LOG.info("Reload features : {}", confKeys); - // load keys from files - for (FileConfiguration fileConfig : fileConfigs) { - try { - loadView(fileConfig); - } catch (Exception ex) { - if (!fileNotFound(ex)) { - LOG.error("Config reload failed for file {}", fileConfig.getFileName(), ex); - } - } - } - for (ConfigurationListener listener : confListeners) { - listener.onReload(viewConfig); - } - } - - private boolean fileNotFound(Exception ex) { - return ex instanceof FileNotFoundException || - ex.getCause() != null && ex.getCause() instanceof FileNotFoundException; - } - - private void loadView(FileConfiguration fileConfig) { - Iterator fileIter = fileConfig.getKeys(); - while (fileIter.hasNext()) { - String key = (String) fileIter.next(); - setViewProperty(fileConfig, key, fileConfig.getProperty(key)); - } - } - - private void clearViewProperty(String key) { - LOG.debug("Removing property, key={}", key); - viewConfig.clearProperty(key); - } - - private void setViewProperty(FileConfiguration fileConfig, - String key, - Object value) { - if (!viewConfig.containsKey(key) || !viewConfig.getProperty(key).equals(value)) { - LOG.debug("Setting property, key={} value={}", key, fileConfig.getProperty(key)); - viewConfig.setProperty(key, fileConfig.getProperty(key)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicConfigurationFactory.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicConfigurationFactory.java deleted file mode 100644 index 2510f74..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicConfigurationFactory.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -import com.google.common.collect.Lists; -import com.twitter.distributedlog.DistributedLogConfiguration; - -import java.io.File; -import java.io.FileNotFoundException; -import java.net.MalformedURLException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.configuration.ConfigurationException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Encapsulates creation of DynamicDistributedLogConfiguration instances. Ensures one instance per - * factory. - * Notes: - * Once loaded, stays loaded until shutdown. Caller ensures small finite number of configs are created. - */ -public class DynamicConfigurationFactory { - private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigurationFactory.class); - - private final Map<String, DynamicDistributedLogConfiguration> dynamicConfigs; - private final List<ConfigurationSubscription> subscriptions; - private final ScheduledExecutorService executorService; - private final int reloadPeriod; - private final TimeUnit reloadUnit; - - public DynamicConfigurationFactory(ScheduledExecutorService executorService, int reloadPeriod, TimeUnit reloadUnit) { - this.executorService = executorService; - this.reloadPeriod = reloadPeriod; - this.reloadUnit = reloadUnit; - this.dynamicConfigs = new HashMap<String, DynamicDistributedLogConfiguration>(); - this.subscriptions = new LinkedList<ConfigurationSubscription>(); - } - - public synchronized Optional<DynamicDistributedLogConfiguration> getDynamicConfiguration( - String configPath, - ConcurrentBaseConfiguration defaultConf) throws ConfigurationException { - Preconditions.checkNotNull(configPath); - try { - if (!dynamicConfigs.containsKey(configPath)) { - File configFile = new File(configPath); - FileConfigurationBuilder properties = - new PropertiesConfigurationBuilder(configFile.toURI().toURL()); - DynamicDistributedLogConfiguration dynConf = - new DynamicDistributedLogConfiguration(defaultConf); - List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties); - ConfigurationSubscription subscription = new ConfigurationSubscription( - dynConf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit); - subscriptions.add(subscription); - dynamicConfigs.put(configPath, dynConf); - LOG.info("Loaded dynamic configuration at {}", configPath); - } - return Optional.of(dynamicConfigs.get(configPath)); - } catch (MalformedURLException ex) { - throw new ConfigurationException(ex); - } - } - - public synchronized Optional<DynamicDistributedLogConfiguration> getDynamicConfiguration(String configPath) throws ConfigurationException { - return getDynamicConfiguration(configPath, new ConcurrentConstConfiguration(new DistributedLogConfiguration())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicDistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicDistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicDistributedLogConfiguration.java deleted file mode 100644 index ca43cfa..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/DynamicDistributedLogConfiguration.java +++ /dev/null @@ -1,356 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.bk.QuorumConfig; - -import static com.twitter.distributedlog.DistributedLogConfiguration.*; - -/** - * Whitelist dynamic configuration by adding an accessor to this class. - */ -public class DynamicDistributedLogConfiguration extends ConcurrentBaseConfiguration { - - private final ConcurrentBaseConfiguration defaultConfig; - - public DynamicDistributedLogConfiguration(ConcurrentBaseConfiguration defaultConfig) { - this.defaultConfig = defaultConfig; - } - - private static int getInt(ConcurrentBaseConfiguration configuration, - String newKey, - String oldKey, - int defaultValue) { - return configuration.getInt(newKey, configuration.getInt(oldKey, defaultValue)); - } - - /** - * Get retention period in hours - * - * @return retention period in hours - */ - public int getRetentionPeriodHours() { - return getInt( - this, - BKDL_RETENTION_PERIOD_IN_HOURS, - BKDL_RETENTION_PERIOD_IN_HOURS_OLD, - getInt(defaultConfig, - BKDL_RETENTION_PERIOD_IN_HOURS, - BKDL_RETENTION_PERIOD_IN_HOURS_OLD, - BKDL_RETENTION_PERIOD_IN_HOURS_DEFAULT) - ); - } - - /** - * A lower threshold bytes per second limit on writes to the distributedlog proxy. - * - * @return Bytes per second write limit - */ - public int getBpsSoftWriteLimit() { - return getInt(DistributedLogConfiguration.BKDL_BPS_SOFT_WRITE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_BPS_SOFT_WRITE_LIMIT, - DistributedLogConfiguration.BKDL_BPS_SOFT_WRITE_LIMIT_DEFAULT)); - } - - /** - * An upper threshold bytes per second limit on writes to the distributedlog proxy. - * - * @return Bytes per second write limit - */ - public int getBpsHardWriteLimit() { - return getInt(DistributedLogConfiguration.BKDL_BPS_HARD_WRITE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_BPS_HARD_WRITE_LIMIT, - DistributedLogConfiguration.BKDL_BPS_HARD_WRITE_LIMIT_DEFAULT)); - } - - /** - * A lower threshold requests per second limit on writes to the distributedlog proxy. - * - * @return Requests per second write limit - */ - public int getRpsSoftWriteLimit() { - return getInt(DistributedLogConfiguration.BKDL_RPS_SOFT_WRITE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_RPS_SOFT_WRITE_LIMIT, - DistributedLogConfiguration.BKDL_RPS_SOFT_WRITE_LIMIT_DEFAULT)); - } - - /** - * An upper threshold requests per second limit on writes to the distributedlog proxy. - * - * @return Requests per second write limit - */ - public int getRpsHardWriteLimit() { - return getInt(DistributedLogConfiguration.BKDL_RPS_HARD_WRITE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_RPS_HARD_WRITE_LIMIT, - DistributedLogConfiguration.BKDL_RPS_HARD_WRITE_LIMIT_DEFAULT)); - } - - /** - * A lower threshold requests per second limit on writes to the distributedlog proxy globally. - * - * @return Requests per second write limit - */ - public int getRpsSoftServiceLimit() { - return getInt(DistributedLogConfiguration.BKDL_RPS_SOFT_SERVICE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_RPS_SOFT_SERVICE_LIMIT, - DistributedLogConfiguration.BKDL_RPS_SOFT_SERVICE_LIMIT_DEFAULT)); - } - - /** - * An upper threshold requests per second limit on writes to the distributedlog proxy globally. - * - * @return Requests per second write limit - */ - public int getRpsHardServiceLimit() { - return getInt(DistributedLogConfiguration.BKDL_RPS_HARD_SERVICE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_RPS_HARD_SERVICE_LIMIT, - DistributedLogConfiguration.BKDL_RPS_HARD_SERVICE_LIMIT_DEFAULT)); - } - - /** - * When 60min average rps for the entire service instance hits this value, new streams will be - * rejected. - * - * @return Requests per second limit - */ - public int getRpsStreamAcquireServiceLimit() { - return getInt(DistributedLogConfiguration.BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT, - DistributedLogConfiguration.BKDL_RPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT)); - } - - /** - * A lower threshold bytes per second limit on writes to the distributedlog proxy globally. - * - * @return Bytes per second write limit - */ - public int getBpsSoftServiceLimit() { - return getInt(DistributedLogConfiguration.BKDL_BPS_SOFT_SERVICE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_BPS_SOFT_SERVICE_LIMIT, - DistributedLogConfiguration.BKDL_BPS_SOFT_SERVICE_LIMIT_DEFAULT)); - } - - /** - * An upper threshold bytes per second limit on writes to the distributedlog proxy globally. - * - * @return Bytes per second write limit - */ - public int getBpsHardServiceLimit() { - return getInt(DistributedLogConfiguration.BKDL_BPS_HARD_SERVICE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_BPS_HARD_SERVICE_LIMIT, - DistributedLogConfiguration.BKDL_BPS_HARD_SERVICE_LIMIT_DEFAULT)); - } - - /** - * When 60min average bps for the entire service instance hits this value, new streams will be - * rejected. - * - * @return Bytes per second limit - */ - public int getBpsStreamAcquireServiceLimit() { - return getInt(DistributedLogConfiguration.BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT, - DistributedLogConfiguration.BKDL_BPS_STREAM_ACQUIRE_SERVICE_LIMIT_DEFAULT)); - } - - /** - * Get percent of write bytes which should be delayed by BKDL_EI_INJECTED_WRITE_DELAY_MS. - * - * @return percent of writes to delay. - */ - public double getEIInjectedWriteDelayPercent() { - return getDouble(DistributedLogConfiguration.BKDL_EI_INJECTED_WRITE_DELAY_PERCENT, - defaultConfig.getDouble(DistributedLogConfiguration.BKDL_EI_INJECTED_WRITE_DELAY_PERCENT, - DistributedLogConfiguration.BKDL_EI_INJECTED_WRITE_DELAY_PERCENT_DEFAULT)); - } - - /** - * Get amount of time to delay writes for in writer failure injection. - * - * @return millis to delay writes for. - */ - public int getEIInjectedWriteDelayMs() { - return getInt(DistributedLogConfiguration.BKDL_EI_INJECTED_WRITE_DELAY_MS, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_EI_INJECTED_WRITE_DELAY_MS, - DistributedLogConfiguration.BKDL_EI_INJECTED_WRITE_DELAY_MS_DEFAULT)); - } - - /** - * Get output buffer size - * - * @return buffer size - */ - public int getOutputBufferSize() { - return getInt( - this, - BKDL_OUTPUT_BUFFER_SIZE, - BKDL_OUTPUT_BUFFER_SIZE_OLD, - getInt(defaultConfig, - BKDL_OUTPUT_BUFFER_SIZE, - BKDL_OUTPUT_BUFFER_SIZE_OLD, - BKDL_OUTPUT_BUFFER_SIZE_DEFAULT) - ); - } - - /** - * Get Periodic Log Flush Frequency in seconds - * - * @return periodic flush frequency - */ - public int getPeriodicFlushFrequencyMilliSeconds() { - return getInt(DistributedLogConfiguration.BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, - defaultConfig.getInt(DistributedLogConfiguration.BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS, - DistributedLogConfiguration.BKDL_PERIODIC_FLUSH_FREQUENCY_MILLISECONDS_DEFAULT)); - } - - /** - * Get the number of entries that readahead worker reads as a batch from bookkeeper - * - * @return the batch size - */ - public int getReadAheadBatchSize() { - return getInt( - this, - BKDL_READAHEAD_BATCHSIZE, - BKDL_READAHEAD_BATCHSIZE_OLD, - getInt(defaultConfig, - BKDL_READAHEAD_BATCHSIZE, - BKDL_READAHEAD_BATCHSIZE_OLD, - BKDL_READAHEAD_BATCHSIZE_DEFAULT) - ); - } - - /** - * Get the maximum number of {@link com.twitter.distributedlog.LogRecord } that readahead worker will cache. - * - * @return the maximum number - */ - public int getReadAheadMaxRecords() { - return getInt( - this, - BKDL_READAHEAD_MAX_RECORDS, - BKDL_READAHEAD_MAX_RECORDS_OLD, - getInt(defaultConfig, - BKDL_READAHEAD_MAX_RECORDS, - BKDL_READAHEAD_MAX_RECORDS_OLD, - BKDL_READAHEAD_MAX_RECORDS_DEFAULT) - ); - } - - /** - * Whether to enable ledger allocator pool or not. - * It is disabled by default. - * - * @return whether using ledger allocator pool or not. - */ - public boolean getEnableLedgerAllocatorPool() { - return getBoolean(BKDL_ENABLE_LEDGER_ALLOCATOR_POOL, - defaultConfig.getBoolean( - BKDL_ENABLE_LEDGER_ALLOCATOR_POOL, - BKDL_ENABLE_LEDGER_ALLOCATOR_POOL_DEFAULT)); - } - - /** - * Get the quorum config. - * - * @return quorum config - */ - public QuorumConfig getQuorumConfig() { - int ensembleSize = getInt( - this, - BKDL_BOOKKEEPER_ENSEMBLE_SIZE, - BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD, - getInt(defaultConfig, - BKDL_BOOKKEEPER_ENSEMBLE_SIZE, - BKDL_BOOKKEEPER_ENSEMBLE_SIZE_OLD, - BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT)); - int writeQuorumSize = getInt( - this, - BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE, - BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD, - getInt(defaultConfig, - BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE, - BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_OLD, - BKDL_BOOKKEEPER_WRITE_QUORUM_SIZE_DEFAULT)); - int ackQuorumSize = getInt( - this, - BKDL_BOOKKEEPER_ACK_QUORUM_SIZE, - BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD, - getInt(defaultConfig, - BKDL_BOOKKEEPER_ACK_QUORUM_SIZE, - BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_OLD, - BKDL_BOOKKEEPER_ACK_QUORUM_SIZE_DEFAULT)); - return new QuorumConfig(ensembleSize, writeQuorumSize, ackQuorumSize); - } - - /** - * Get the maximum number of partitions of each stream allowed to be acquired per proxy. - * - * @return maximum number of partitions of each stream allowed to be acquired - * @see DistributedLogConfiguration#getMaxAcquiredPartitionsPerProxy() - */ - public int getMaxAcquiredPartitionsPerProxy() { - return getInt( - BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY, - defaultConfig.getInt( - BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY, - BKDL_MAX_ACQUIRED_PARTITIONS_PER_PROXY_DEFAULT) - ); - } - - /** - * Get the maximum number of partitions of each stream allowed to cache per proxy. - * - * @return maximum number of partitions of each stream allowed to cache - * @see DistributedLogConfiguration#getMaxAcquiredPartitionsPerProxy() - */ - public int getMaxCachedPartitionsPerProxy() { - return getInt( - BKDL_MAX_CACHED_PARTITIONS_PER_PROXY, - defaultConfig.getInt( - BKDL_MAX_CACHED_PARTITIONS_PER_PROXY, - BKDL_MAX_CACHED_PARTITIONS_PER_PROXY_DEFAULT) - ); - } - - /** - * Check whether the durable write is enabled. - * - * @return true if durable write is enabled. otherwise, false. - */ - public boolean isDurableWriteEnabled() { - return getBoolean(BKDL_IS_DURABLE_WRITE_ENABLED, - defaultConfig.getBoolean( - BKDL_IS_DURABLE_WRITE_ENABLED, - BKDL_IS_DURABLE_WRITE_ENABLED_DEFAULT)); - } - - /** - * Get the flag whether to deserialize recordset on reads. - * - * @return flag whether to deserialize recordset on reads. - */ - public boolean getDeserializeRecordSetOnReads() { - return getBoolean(BKDL_DESERIALIZE_RECORDSET_ON_READS, - defaultConfig.getBoolean( - BKDL_DESERIALIZE_RECORDSET_ON_READS, - BKDL_DESERIALIZE_RECORDSET_ON_READS_DEFAULT)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/FileConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/FileConfigurationBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/FileConfigurationBuilder.java deleted file mode 100644 index b3c4e6c..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/FileConfigurationBuilder.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.FileConfiguration; - -/** - * Abstract out FileConfiguration subclass construction. - */ -public interface FileConfigurationBuilder { - FileConfiguration getConfiguration() throws ConfigurationException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/PropertiesConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/PropertiesConfigurationBuilder.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/PropertiesConfigurationBuilder.java deleted file mode 100644 index 6efaa20..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/PropertiesConfigurationBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.config; - -import java.net.URL; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.FileConfiguration; -import org.apache.commons.configuration.PropertiesConfiguration; - -/** - * Hide PropertiesConfiguration dependency. - */ -public class PropertiesConfigurationBuilder implements FileConfigurationBuilder { - private URL url; - - public PropertiesConfigurationBuilder(URL url) { - this.url = url; - } - - @Override - public FileConfiguration getConfiguration() throws ConfigurationException { - return new PropertiesConfiguration(url); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/config/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/config/package-info.java deleted file mode 100644 index b4f77b4..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/config/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * DistributedLog Configuration - */ -package com.twitter.distributedlog.config; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/exceptions/ZKException.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/exceptions/ZKException.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/exceptions/ZKException.java deleted file mode 100644 index 8ed1610..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/exceptions/ZKException.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.exceptions; - -import com.twitter.distributedlog.thrift.service.StatusCode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; - -/** - * TODO: move ZKException to distributedlog-protocol - */ -public class ZKException extends DLException { - - private static final long serialVersionUID = 7542748595054923600L; - - final KeeperException.Code code; - - public ZKException(String msg, Code code) { - super(StatusCode.ZOOKEEPER_ERROR, msg + " : " + code); - this.code = code; - } - - public ZKException(String msg, KeeperException exception) { - super(StatusCode.ZOOKEEPER_ERROR, msg, exception); - this.code = exception.code(); - } - - public Code getKeeperExceptionCode() { - return this.code; - } - - public static boolean isRetryableZKException(ZKException zke) { - KeeperException.Code code = zke.getKeeperExceptionCode(); - return KeeperException.Code.CONNECTIONLOSS == code || - KeeperException.Code.OPERATIONTIMEOUT == code || - KeeperException.Code.SESSIONEXPIRED == code || - KeeperException.Code.SESSIONMOVED == code; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/AbstractFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/AbstractFeatureProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/AbstractFeatureProvider.java deleted file mode 100644 index f484307..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/AbstractFeatureProvider.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.feature; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import org.apache.bookkeeper.feature.CacheableFeatureProvider; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; - -/** - * Decider based feature provider - */ -public abstract class AbstractFeatureProvider<T extends Feature> extends CacheableFeatureProvider<T> { - - protected static final Logger logger = LoggerFactory.getLogger(AbstractFeatureProvider.class); - - public static FeatureProvider getFeatureProvider(String rootScope, - DistributedLogConfiguration conf, - StatsLogger statsLogger) - throws IOException { - Class<? extends FeatureProvider> featureProviderClass; - try { - featureProviderClass = conf.getFeatureProviderClass(); - } catch (ConfigurationException e) { - throw new IOException("Can't initialize the feature provider : ", e); - } - // create feature provider - Constructor<? extends FeatureProvider> constructor; - try { - constructor = featureProviderClass.getDeclaredConstructor( - String.class, - DistributedLogConfiguration.class, - StatsLogger.class); - } catch (NoSuchMethodException e) { - throw new IOException("No constructor found for feature provider class " + featureProviderClass + " : ", e); - } - try { - return constructor.newInstance(rootScope, conf, statsLogger); - } catch (InstantiationException e) { - throw new IOException("Failed to instantiate feature provider : ", e); - } catch (IllegalAccessException e) { - throw new IOException("Encountered illegal access when instantiating feature provider : ", e); - } catch (InvocationTargetException e) { - Throwable targetException = e.getTargetException(); - if (targetException instanceof IOException) { - throw (IOException) targetException; - } else { - throw new IOException("Encountered invocation target exception while instantiating feature provider : ", e); - } - } - } - - protected final DistributedLogConfiguration conf; - protected final StatsLogger statsLogger; - - protected AbstractFeatureProvider(String rootScope, - DistributedLogConfiguration conf, - StatsLogger statsLogger) { - super(rootScope); - this.conf = conf; - this.statsLogger = statsLogger; - } - - /** - * Start the feature provider. - * - * @throws IOException when failed to start the feature provider. - */ - public void start() throws IOException { - // no-op - } - - /** - * Stop the feature provider. - */ - public void stop() { - // no-op - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/ConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/ConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/ConfigurationFeatureProvider.java deleted file mode 100644 index 02a4d79..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/ConfigurationFeatureProvider.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.feature; - -import com.twitter.distributedlog.config.ConcurrentBaseConfiguration; -import org.apache.bookkeeper.feature.CacheableFeatureProvider; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.SettableFeature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentMap; - -/** - * Feature Provider that load features from configuration - */ -class ConfigurationFeatureProvider extends CacheableFeatureProvider { - - private static final Logger logger = LoggerFactory.getLogger(ConfigurationFeatureProvider.class); - - static SettableFeature makeFeature(ConcurrentBaseConfiguration featuresConf, - ConcurrentMap<String, SettableFeature> features, - String featureName) { - SettableFeature feature = features.get(featureName); - if (null == feature) { - int availability = featuresConf.getInt(featureName, 0); - feature = new SettableFeature(featureName, availability); - SettableFeature oldFeature = - features.putIfAbsent(featureName, feature); - if (null != oldFeature) { - feature = oldFeature; - } else { - logger.info("Load feature {}={}", featureName, availability); - } - } - return feature; - } - - private final ConcurrentBaseConfiguration featuresConf; - private final ConcurrentMap<String, SettableFeature> features; - - ConfigurationFeatureProvider(String rootScope, - ConcurrentBaseConfiguration featuresConf, - ConcurrentMap<String, SettableFeature> features) { - super(rootScope); - this.featuresConf = featuresConf; - this.features = features; - } - - @Override - protected Feature makeFeature(String featureName) { - return makeFeature(featuresConf, features, featureName); - } - - @Override - protected FeatureProvider makeProvider(String fullScopeName) { - return new ConfigurationFeatureProvider( - fullScopeName, featuresConf, features); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/CoreFeatureKeys.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/CoreFeatureKeys.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/CoreFeatureKeys.java deleted file mode 100644 index 49b3354..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/CoreFeatureKeys.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.feature; - -/** - * List of feature keys used by distributedlog core - */ -public enum CoreFeatureKeys { - // @Deprecated: bkc features are managed by bookkeeper prefixed with a scope - DISABLE_DURABILITY_ENFORCEMENT, - // disabling logsegment rolling - DISABLE_LOGSEGMENT_ROLLING, - DISABLE_WRITE_LIMIT, -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DefaultFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DefaultFeatureProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DefaultFeatureProvider.java deleted file mode 100644 index 6554eaa..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DefaultFeatureProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.feature; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.SettableFeature; -import org.apache.bookkeeper.feature.SettableFeatureProvider; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Default feature provider which disable all features by default. - */ -public class DefaultFeatureProvider extends AbstractFeatureProvider { - - public DefaultFeatureProvider(String rootScope, - DistributedLogConfiguration conf, - StatsLogger statsLogger) { - super(rootScope, conf, statsLogger); - } - - @Override - protected Feature makeFeature(String featureName) { - return new SettableFeature(featureName, 0); - } - - @Override - protected FeatureProvider makeProvider(String fullScopeName) { - return new SettableFeatureProvider(fullScopeName, 0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DynamicConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DynamicConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DynamicConfigurationFeatureProvider.java deleted file mode 100644 index 1eeb155..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/DynamicConfigurationFeatureProvider.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.feature; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.config.ConcurrentBaseConfiguration; -import com.twitter.distributedlog.config.ConfigurationListener; -import com.twitter.distributedlog.config.ConfigurationSubscription; -import com.twitter.distributedlog.config.FileConfigurationBuilder; -import com.twitter.distributedlog.config.PropertiesConfigurationBuilder; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.SettableFeature; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.configuration.ConfigurationException; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Feature Provider based dynamic configuration. - */ -public class DynamicConfigurationFeatureProvider extends AbstractFeatureProvider - implements ConfigurationListener { - - private final ConcurrentBaseConfiguration featuresConf; - private ConfigurationSubscription featuresConfSubscription; - private final ConcurrentMap<String, SettableFeature> features; - private final ScheduledExecutorService executorService; - - public DynamicConfigurationFeatureProvider(String rootScope, - DistributedLogConfiguration conf, - StatsLogger statsLogger) { - super(rootScope, conf, statsLogger); - this.features = new ConcurrentHashMap<String, SettableFeature>(); - this.featuresConf = new ConcurrentBaseConfiguration(); - this.executorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("DynamicConfigurationFeatureProvider-%d").build()); - } - - ConcurrentBaseConfiguration getFeatureConf() { - return featuresConf; - } - - ConfigurationSubscription getFeatureConfSubscription() { - return featuresConfSubscription; - } - - @Override - public void start() throws IOException { - List<FileConfigurationBuilder> fileConfigBuilders = - Lists.newArrayListWithExpectedSize(2); - String baseConfigPath = conf.getFileFeatureProviderBaseConfigPath(); - Preconditions.checkNotNull(baseConfigPath); - File baseConfigFile = new File(baseConfigPath); - FileConfigurationBuilder baseProperties = - new PropertiesConfigurationBuilder(baseConfigFile.toURI().toURL()); - fileConfigBuilders.add(baseProperties); - String overlayConfigPath = conf.getFileFeatureProviderOverlayConfigPath(); - if (null != overlayConfigPath) { - File overlayConfigFile = new File(overlayConfigPath); - FileConfigurationBuilder overlayProperties = - new PropertiesConfigurationBuilder(overlayConfigFile.toURI().toURL()); - fileConfigBuilders.add(overlayProperties); - } - try { - this.featuresConfSubscription = new ConfigurationSubscription( - this.featuresConf, - fileConfigBuilders, - executorService, - conf.getDynamicConfigReloadIntervalSec(), - TimeUnit.SECONDS); - } catch (ConfigurationException e) { - throw new IOException("Failed to register subscription on features configuration"); - } - this.featuresConfSubscription.registerListener(this); - } - - @Override - public void stop() { - this.executorService.shutdown(); - } - - @Override - public void onReload(ConcurrentBaseConfiguration conf) { - for (Map.Entry<String, SettableFeature> feature : features.entrySet()) { - String featureName = feature.getKey(); - int availability = conf.getInt(featureName, 0); - if (availability != feature.getValue().availability()) { - feature.getValue().set(availability); - logger.info("Reload feature {}={}", featureName, availability); - } - } - } - - @Override - protected Feature makeFeature(String featureName) { - return ConfigurationFeatureProvider.makeFeature( - featuresConf, features, featureName); - } - - @Override - protected FeatureProvider makeProvider(String fullScopeName) { - return new ConfigurationFeatureProvider( - fullScopeName, featuresConf, features); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/package-info.java deleted file mode 100644 index e8d8134..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/feature/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Feature & FeatureProvider for distributedlog - */ -package com.twitter.distributedlog.feature; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java deleted file mode 100644 index 698a088..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/CloseAsyncCloseableFunction.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.function; - -import com.twitter.distributedlog.io.AsyncCloseable; -import scala.Function0; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - -/** - * Function to close {@link com.twitter.distributedlog.io.AsyncCloseable} - */ -public class CloseAsyncCloseableFunction extends AbstractFunction0<BoxedUnit> { - - /** - * Return a function to close an {@link AsyncCloseable}. - * - * @param closeable closeable to close - * @return function to close an {@link AsyncCloseable} - */ - public static Function0<BoxedUnit> of(AsyncCloseable closeable) { - return new CloseAsyncCloseableFunction(closeable); - } - - private final AsyncCloseable closeable; - - private CloseAsyncCloseableFunction(AsyncCloseable closeable) { - this.closeable = closeable; - } - - @Override - public BoxedUnit apply() { - closeable.asyncClose(); - return BoxedUnit.UNIT; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/function/DefaultValueMapFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/DefaultValueMapFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/DefaultValueMapFunction.java deleted file mode 100644 index f08cd0f..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/DefaultValueMapFunction.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.function; - -import scala.runtime.AbstractFunction1; - -/** - * Map Function return default value - */ -public class DefaultValueMapFunction<T, R> extends AbstractFunction1<T, R> { - - public static <T, R> DefaultValueMapFunction<T, R> of(R defaultValue) { - return new DefaultValueMapFunction<T, R>(defaultValue); - } - - private final R defaultValue; - - private DefaultValueMapFunction(R defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public R apply(T any) { - return defaultValue; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetLastTxIdFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetLastTxIdFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetLastTxIdFunction.java deleted file mode 100644 index bc77d6a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetLastTxIdFunction.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.function; - -import com.twitter.distributedlog.DistributedLogConstants; -import com.twitter.distributedlog.LogSegmentMetadata; -import scala.runtime.AbstractFunction1; - -import java.util.List; - -/** - * Retrieve the last tx id from list of log segments - */ -public class GetLastTxIdFunction extends AbstractFunction1<List<LogSegmentMetadata>, Long> { - - public static final GetLastTxIdFunction INSTANCE = new GetLastTxIdFunction(); - - private GetLastTxIdFunction() {} - - @Override - public Long apply(List<LogSegmentMetadata> segmentList) { - long lastTxId = DistributedLogConstants.INVALID_TXID; - for (LogSegmentMetadata l : segmentList) { - lastTxId = Math.max(lastTxId, l.getLastTxId()); - } - return lastTxId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java deleted file mode 100644 index 4e7844c..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/GetVersionedValueFunction.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.function; - -import com.twitter.distributedlog.LogSegmentMetadata; -import org.apache.bookkeeper.versioning.Versioned; -import scala.Function1; -import scala.runtime.AbstractFunction1; - -import java.util.List; - -/** - * Function to get the versioned value from {@link org.apache.bookkeeper.versioning.Versioned} - */ -public class GetVersionedValueFunction<T> extends AbstractFunction1<Versioned<T>, T> { - - public static final Function1<Versioned<List<LogSegmentMetadata>>, List<LogSegmentMetadata>> - GET_LOGSEGMENT_LIST_FUNC = new GetVersionedValueFunction<List<LogSegmentMetadata>>(); - - @Override - public T apply(Versioned<T> versionedValue) { - return versionedValue.getValue(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java deleted file mode 100644 index e260482..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/VoidFunctions.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.function; - -import scala.runtime.AbstractFunction1; - -import java.util.List; - -public class VoidFunctions { - - public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC = - new AbstractFunction1<List<Void>, Void>() { - @Override - public Void apply(List<Void> list) { - return null; - } - }; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/function/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/function/package-info.java deleted file mode 100644 index 2da98dc..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/function/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Common Functions for DistributedLog - */ -package com.twitter.distributedlog.function;