This is an automated email from the ASF dual-hosted git repository. mcmellawatt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 610992a GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680) 610992a is described below commit 610992ae8ba1c8d26756606bb62e871ea846544f Author: Ryan McMahon <rmcma...@pivotal.io> AuthorDate: Thu Jun 6 16:54:30 2019 -0700 GEODE-6842: Making serialization of CqNameToOpHashMap thread safe (#3680) By making the CqNameToOpHashMap derive from ConcurrentHashMap and taking a snapshot prior to serialization, we avoid a race where this map could be mutated by a client registration thread while a queue GII which includes this map is occuring in another thread. --- .../sockets/CqNameToOpHashMapIntegrationTest.java | 110 +++++++++++++++++++++ .../tier/sockets/ClientUpdateMessageImpl.java | 8 +- 2 files changed, 115 insertions(+), 3 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java new file mode 100644 index 0000000..d8628b2 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CqNameToOpHashMapIntegrationTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.tier.sockets; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import org.junit.Test; + +import org.apache.geode.DataSerializer; +import org.apache.geode.internal.InternalDataSerializer; + +public class CqNameToOpHashMapIntegrationTest { + /** + * This test ensures that we can safely mutate the CqNameToOpHashMap while it is being + * serialized in another thread. The use case for this is that we repopulate this map + * during client registration, which can happen concurrently with a GII which causes + * serialization to occur if this map is referenced by any of the client subscription + * queues. This integration test does not exercise this full system level interaction, + * but rather does the minimum necessary to prove that map mutation and serialization + * can occur concurrently without any issues such as size mismatches or + * ConcurrentModificationExceptions. + */ + @Test + public void testSendToWhileConcurrentlyModifyingMapContentsAndVerifyProperSerialization() + throws IOException, ClassNotFoundException, InterruptedException, ExecutionException, + TimeoutException { + final int numEntries = 1000000; + + ClientUpdateMessageImpl.CqNameToOpHashMap originalCqNameToOpHashMap = + new ClientUpdateMessageImpl.CqNameToOpHashMap(numEntries); + ClientUpdateMessageImpl.CqNameToOpHashMap modifiedCqNameToOpHashMap = + new ClientUpdateMessageImpl.CqNameToOpHashMap(numEntries); + + for (int i = 0; i < numEntries; ++i) { + originalCqNameToOpHashMap.add(String.valueOf(i), i); + modifiedCqNameToOpHashMap.add(String.valueOf(i), i); + } + + CompletableFuture<Void> removeFromHashMapTask = CompletableFuture.runAsync(() -> { + for (int i = 0; i < numEntries; ++i) { + modifiedCqNameToOpHashMap.remove(String.valueOf(i), i); + } + }); + + CompletableFuture<Void> serializeReconstructHashMapTask = CompletableFuture.runAsync(() -> { + try { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + modifiedCqNameToOpHashMap.sendTo(outputStream); + + ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + + DataInputStream inputStream = new DataInputStream(byteArrayInputStream); + + byte typeByte = inputStream.readByte(); + int cqNamesSize = InternalDataSerializer.readArrayLength(inputStream); + + ClientUpdateMessageImpl.CqNameToOpHashMap reconstructedCqNameToOpHashMap = + new ClientUpdateMessageImpl.CqNameToOpHashMap(cqNamesSize); + + for (int j = 0; j < cqNamesSize; j++) { + String cqNamesKey = DataSerializer.<String>readObject(inputStream); + Integer cqNamesValue = DataSerializer.<Integer>readObject(inputStream); + reconstructedCqNameToOpHashMap.add(cqNamesKey, cqNamesValue); + } + + // The reconstructed map should have some subset of the entries in the cqNameToOpHashMap, + // but the specific contents will depend on timing with the removeFromHashMap task. + MapDifference<String, Integer> reconstructedVersusOriginalHashMapDifference = + Maps.difference(reconstructedCqNameToOpHashMap, originalCqNameToOpHashMap); + assertThat(reconstructedVersusOriginalHashMapDifference.entriesInCommon().size() >= 0) + .isTrue(); + assertThat(reconstructedVersusOriginalHashMapDifference.entriesDiffering().size() == 0) + .isTrue(); + } catch (Exception ex) { + throw new RuntimeException("Failed to serialize/deserialize the CqNameToOpHashMap", ex); + } + }); + + CompletableFuture.allOf(removeFromHashMapTask, serializeReconstructHashMapTask).get(1, + TimeUnit.MINUTES); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java index 64c180b..b6dbac0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java @@ -1557,9 +1557,11 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N } } /** - * Basically just a HashMap<String, Integer> but limits itself to the CqNameToOp interface. + * Basically just a ConcurrentHashMap<String, Integer> but limits itself to the CqNameToOp + * interface. */ - public static class CqNameToOpHashMap extends HashMap<String, Integer> implements CqNameToOp { + public static class CqNameToOpHashMap extends ConcurrentHashMap<String, Integer> + implements CqNameToOp { public CqNameToOpHashMap(int initialCapacity) { super(initialCapacity, 1.0f); } @@ -1573,7 +1575,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N public void sendTo(DataOutput out) throws IOException { // When serialized it needs to look just as if writeObject was called on a HASH_MAP out.writeByte(DSCODE.HASH_MAP.toByte()); - DataSerializer.writeHashMap(this, out); + DataSerializer.writeConcurrentHashMap(this, out); } @Override