Michael Blow has submitted this change and it was merged. Change subject: [ASTERIXDB-2213] Guard against concurrent config updates ......................................................................
[ASTERIXDB-2213] Guard against concurrent config updates Change-Id: If7dffb1b502b9331118ad344e6f4ef0d625f4c8f Reviewed-on: https://asterix-gerrit.ics.uci.edu/2467 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Michael Blow <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java 9 files changed, 307 insertions(+), 44 deletions(-) Approvals: Michael Blow: Verified Murtadha Hubail: Looks good to me, approved diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index ba4f82d..c60e673 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.util.IOInterruptibleAction; +import org.apache.hyracks.util.InterruptibleAction; +import org.apache.hyracks.util.ThrowingAction; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -230,20 +233,5 @@ Thread.currentThread().interrupt(); } } - } - - @FunctionalInterface - public interface InterruptibleAction { - void run() throws InterruptedException; - } - - @FunctionalInterface - public interface ThrowingAction { - void run() throws Exception; // NOSONAR - } - - @FunctionalInterface - public interface IOInterruptibleAction { - void run() throws IOException, InterruptedException; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml index e13e23b..2f9903a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml @@ -83,5 +83,10 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java index 9564922..8dd95e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -69,27 +70,32 @@ private static final Logger LOGGER = LogManager.getLogger(); private HashSet<IOption> registeredOptions = new HashSet<>(); - private HashMap<IOption, Object> definedMap = new HashMap<>(); - private HashMap<IOption, Object> defaultMap = new HashMap<>(); - private CompositeMap<IOption, Object> configurationMap = - new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator()); + @SuppressWarnings("squid:S1948") // HashMap is serializable, and therefore so is its synchronized map + private Map<IOption, Object> definedMap = Collections.synchronizedMap(new HashMap<>()); + @SuppressWarnings("squid:S1948") // HashMap is serializable, and therefore so is its synchronized map + private Map<IOption, Object> defaultMap = Collections.synchronizedMap(new HashMap<>()); + @SuppressWarnings("squid:S1948") // CompositeMap and his encapsulated maps are serializable, therefore so is this + private Map<IOption, Object> configurationMap = + Collections.synchronizedMap(new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator())); private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class); @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map private Map<String, Map<IOption, Object>> nodeSpecificDefinedMap = Collections.synchronizedMap(new TreeMap<>()); @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map private Map<String, Map<IOption, Object>> nodeSpecificDefaultMap = Collections.synchronizedMap(new TreeMap<>()); + @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map + private Map<String, Map<IOption, Object>> nodeEffectiveMaps = Collections.synchronizedMap(new HashMap<>()); private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>(); private final String[] args; private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this); private Set<String> allSections = new HashSet<>(); private transient Collection<Consumer<List<String>>> argListeners = new ArrayList<>(); private transient Collection<IOption> iniPointerOptions = new ArrayList<>(); - private transient Collection<Section> cmdLineSections = new ArrayList<>();; + private transient Collection<Section> cmdLineSections = new ArrayList<>(); private transient OptionHandlerFilter usageFilter; private transient SortedMap<Integer, List<IConfigurator>> configurators = new TreeMap<>(); private boolean configured; private String versionString = "version undefined"; - private transient Map<String, Set<Map.Entry<String, String>>> extensionOptions = new TreeMap(); + private transient Map<String, Set<Map.Entry<String, String>>> extensionOptions = new TreeMap<>(); public ConfigManager() { this(null); @@ -171,15 +177,28 @@ if (node == null) { return isDefault ? defaultMap : definedMap; } else { - ensureNode(node); - return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node); + synchronized (this) { + ensureNode(node); + return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node); + } } } - public void ensureNode(String nodeId) { + public synchronized void ensureNode(String nodeId) { LOGGER.debug("ensureNode: " + nodeId); - nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); - nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + Map<IOption, Object> nodeDefinedMap = + nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + Map<IOption, Object> nodeDefaultMap = + nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap); + nodeEffectiveMaps.computeIfAbsent(nodeId, id -> Collections + .synchronizedMap(compositeFrom(Stream.of(nodeDefinedMap, nodeDefaultMap, definedMap)))); + } + + public synchronized void forgetNode(String nodeId) { + LOGGER.debug("forgetNode: " + nodeId); + nodeSpecificDefinedMap.remove(nodeId); + nodeSpecificDefaultMap.remove(nodeId); + nodeEffectiveMaps.remove(nodeId); } private Map<IOption, Object> createNodeSpecificMap(String nodeId) { @@ -236,7 +255,7 @@ invokeSetters(option, option.type().parse(String.valueOf(value)), null); } - private void invokeSetters(IOption option, Object value, String nodeId) { + private synchronized void invokeSetters(IOption option, Object value, String nodeId) { optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, false)); } @@ -369,7 +388,7 @@ }); } - private Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) { + private synchronized Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) { if (map.containsKey(option)) { return map.get(option); } else { @@ -426,7 +445,7 @@ @Override public Set<IOption> getOptions(Section section) { - return getSectionOptionMap(section).values().stream().collect(Collectors.toSet()); + return new HashSet<>(getSectionOptionMap(section).values()); } private Map<String, IOption> getSectionOptionMap(Section section) { @@ -438,7 +457,7 @@ return Collections.unmodifiableList(new ArrayList<>(nodeSpecificDefinedMap.keySet())); } - public IApplicationConfig getNodeEffectiveConfig(String nodeId) { + public synchronized IApplicationConfig getNodeEffectiveConfig(String nodeId) { ensureNode(nodeId); final Map<IOption, Object> nodeMap = nodeSpecificDefaultMap.get(nodeId); Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId); @@ -454,15 +473,22 @@ }; } - private Map<IOption, Object> getNodeEffectiveMap(String nodeId) { + private synchronized Map<IOption, Object> getNodeEffectiveMap(String nodeId) { ensureNode(nodeId); - return new CompositeMap<>( - Stream.of(nodeSpecificDefinedMap.get(nodeId), nodeSpecificDefaultMap.get(nodeId), definedMap) - .toArray(Map[]::new), - new NoOpMapMutator()); + return nodeEffectiveMaps.get(nodeId); } - public Ini toIni(boolean includeDefaults) { + private synchronized CompositeMap<IOption, Object> compositeFrom(Stream<Map<IOption, Object>> stream) { + List<Map<IOption, Object>> list = stream.collect(Collectors.toList()); + CompositeMap<IOption, Object> map = new CompositeMap<>(); + map.setMutator(new NoOpMapMutator()); + for (ListIterator<Map<IOption, Object>> iter = list.listIterator(list.size()); iter.hasPrevious();) { + map.addComposited(iter.previous()); + } + return map; + } + + public synchronized Ini toIni(boolean includeDefaults) { Ini ini = new Ini(); (includeDefaults ? configurationMap : definedMap).forEach((option, value) -> { if (value != null) { @@ -474,12 +500,10 @@ ensureNode(key); Map<IOption, Object> nodeValueMap = includeDefaults ? getNodeEffectiveMap(key) : nodeSpecificDefinedMap.get(key); - synchronized (nodeValueMap) { - for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) { - if (entry.getValue() != null) { - final IOption option = entry.getKey(); - ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue())); - } + for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) { + if (entry.getValue() != null) { + final IOption option = entry.getKey(); + ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue())); } } } @@ -492,7 +516,7 @@ set(null, option, value); } - public void set(String nodeId, IOption option, Object value) { + public synchronized void set(String nodeId, IOption option, Object value) { invokeSetters(option, copyIfArray(value), nodeId); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java new file mode 100644 index 0000000..ef5f69c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.config; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.api.config.IOptionType; +import org.apache.hyracks.api.config.Section; +import org.apache.hyracks.util.Span; +import org.junit.Test; + +public class ConfigManagerTest { + + public enum Option implements IOption { + OPTION1, + OPTION2, + OPTION3, + OPTION4, + OPTION5; + + @Override + public Section section() { + return Section.values()[this.ordinal() % Section.values().length]; + } + + @Override + public String description() { + return "Description for " + name(); + } + + @Override + public IOptionType type() { + return OptionTypes.INTEGER; + } + + @Override + public Object defaultValue() { + return name() + " default value"; + } + } + + private static final Random RANDOM = new Random(); + + @Test + public void testConcurrentUpdates() throws Exception { + ConfigManager configManager = new ConfigManager(); + configManager.register(Option.class); + ExecutorService executor = Executors.newCachedThreadPool(); + List<Future<Void>> futures = new ArrayList<>(); + IntStream.range(0, 20).forEach(a -> futures.add(executor.submit(() -> { + Span.start(30, TimeUnit.SECONDS).loopUntilExhausted(() -> { + String node = "node" + RANDOM.nextInt(5); + IntStream.range(0, 20).parallel().forEach(a1 -> { + if (RANDOM.nextBoolean()) { + configManager.set(node, randomOption(), RANDOM.nextInt()); + } else { + configManager.getNodeEffectiveConfig(node).get(randomOption()); + } + if (RANDOM.nextBoolean()) { + configManager.forgetNode(node); + } + }); + }); + return null; + }))); + MutableObject<Exception> failure = new MutableObject<>(); + futures.forEach(f -> { + try { + f.get(); + } catch (Exception e) { + if (failure.getValue() == null) { + failure.setValue(e); + } else { + failure.getValue().addSuppressed(e); + } + } + }); + if (failure.getValue() != null) { + throw failure.getValue(); + } + } + + private static Option randomOption() { + return Option.values()[RANDOM.nextInt(Option.values().length)]; + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java index 4235d19..8c81d41 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java @@ -24,7 +24,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IAsyncRequest; import org.apache.hyracks.api.io.IFileHandle; -import org.apache.hyracks.api.util.InvokeUtil.InterruptibleAction; +import org.apache.hyracks.util.InterruptibleAction; public class IoRequest implements IAsyncRequest, InterruptibleAction { diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java new file mode 100644 index 0000000..1fca227 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.IOException; + +@FunctionalInterface +public interface IOInterruptibleAction { + void run() throws IOException, InterruptedException; +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java new file mode 100644 index 0000000..8a3787e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +@FunctionalInterface +public interface InterruptibleAction { + void run() throws InterruptedException; +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java new file mode 100644 index 0000000..95db604 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.util.concurrent.TimeUnit; + +public class Span { + private final long startNanos; + private final long spanNanos; + + private Span(long span, TimeUnit unit) { + startNanos = System.nanoTime(); + spanNanos = unit.toNanos(span); + } + + public static Span start(long span, TimeUnit unit) { + return new Span(span, unit); + } + + public boolean elapsed() { + return remaining(TimeUnit.NANOSECONDS) > spanNanos; + } + + public long remaining(TimeUnit unit) { + return unit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS); + } + + public void sleep(long sleep, TimeUnit unit) throws InterruptedException { + TimeUnit.NANOSECONDS.sleep(Math.min(remaining(TimeUnit.NANOSECONDS), unit.toNanos(sleep))); + } + + public void loopUntilExhausted(ThrowingAction action) throws Exception { + loopUntilExhausted(action, 0, TimeUnit.NANOSECONDS); + } + + public void loopUntilExhausted(ThrowingAction action, long delay, TimeUnit delayUnit) throws Exception { + while (!elapsed()) { + action.run(); + if (remaining(delayUnit) < delay) { + break; + } + delayUnit.sleep(delay); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java new file mode 100644 index 0000000..d675179 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +@FunctionalInterface +public interface ThrowingAction { + void run() throws Exception; // NOSONAR +} -- To view, visit https://asterix-gerrit.ics.uci.edu/2467 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If7dffb1b502b9331118ad344e6f4ef0d625f4c8f Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
