This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac493ebc882ee18a7c7e75d15ddcad695de1f51b Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Jan 13 17:24:03 2021 +0100 [FLINK-20978] Implement HeapSavepointRestoreOperation This commit implements the logic of restoring a heap keyed state backend from a savepoint in a unified binary format. It eagerly deserializes all states and populates the in memory structures. --- .../state/heap/HeapMetaInfoRestoreOperation.java | 133 +++++++++++ .../runtime/state/heap/HeapRestoreOperation.java | 94 ++------ .../state/heap/HeapSavepointRestoreOperation.java | 259 +++++++++++++++++++++ 3 files changed, 405 insertions(+), 81 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java new file mode 100644 index 0000000..aecc44b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMetaInfoRestoreOperation.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; +import org.apache.flink.runtime.state.StateSerializerProvider; +import org.apache.flink.runtime.state.StateSnapshotRestore; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A helper class shared between the {@link HeapRestoreOperation} and {@link + * HeapSavepointRestoreOperation} for restoring {@link StateMetaInfoSnapshot + * StateMetaInfoSnapshots}. + * + * @param <K> The key by which state is keyed. + */ +class HeapMetaInfoRestoreOperation<K> { + private final StateSerializerProvider<K> keySerializerProvider; + private final HeapPriorityQueueSetFactory priorityQueueSetFactory; + @Nonnull private final KeyGroupRange keyGroupRange; + @Nonnegative private final int numberOfKeyGroups; + private final StateTableFactory<K> stateTableFactory; + private final InternalKeyContext<K> keyContext; + + HeapMetaInfoRestoreOperation( + StateSerializerProvider<K> keySerializerProvider, + HeapPriorityQueueSetFactory priorityQueueSetFactory, + @Nonnull KeyGroupRange keyGroupRange, + int numberOfKeyGroups, + StateTableFactory<K> stateTableFactory, + InternalKeyContext<K> keyContext) { + this.keySerializerProvider = keySerializerProvider; + this.priorityQueueSetFactory = priorityQueueSetFactory; + this.keyGroupRange = keyGroupRange; + this.numberOfKeyGroups = numberOfKeyGroups; + this.stateTableFactory = stateTableFactory; + this.keyContext = keyContext; + } + + Map<Integer, StateMetaInfoSnapshot> createOrCheckStateForMetaInfo( + List<StateMetaInfoSnapshot> restoredMetaInfo, + Map<String, StateTable<K, ?, ?>> registeredKVStates, + Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) { + + final Map<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<>(); + for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) { + final StateSnapshotRestore registeredState; + + switch (metaInfoSnapshot.getBackendStateType()) { + case KEY_VALUE: + registeredState = registeredKVStates.get(metaInfoSnapshot.getName()); + if (registeredState == null) { + RegisteredKeyValueStateBackendMetaInfo<?, ?> + registeredKeyedBackendStateMetaInfo = + new RegisteredKeyValueStateBackendMetaInfo<>( + metaInfoSnapshot); + registeredKVStates.put( + metaInfoSnapshot.getName(), + stateTableFactory.newStateTable( + keyContext, + registeredKeyedBackendStateMetaInfo, + keySerializerProvider.currentSchemaSerializer())); + } + break; + case PRIORITY_QUEUE: + registeredState = registeredPQStates.get(metaInfoSnapshot.getName()); + if (registeredState == null) { + registeredPQStates.put( + metaInfoSnapshot.getName(), + createInternal( + new RegisteredPriorityQueueStateBackendMetaInfo<>( + metaInfoSnapshot))); + } + break; + default: + throw new IllegalStateException( + "Unexpected state type: " + + metaInfoSnapshot.getBackendStateType() + + "."); + } + + // always put metaInfo into kvStatesById, because kvStatesById is KeyGroupsStateHandle + // related + kvStatesById.put(kvStatesById.size(), metaInfoSnapshot); + } + + return kvStatesById; + } + + private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> + HeapPriorityQueueSnapshotRestoreWrapper<T> createInternal( + RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) { + + final String stateName = metaInfo.getName(); + final HeapPriorityQueueSet<T> priorityQueue = + priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer()); + + return new HeapPriorityQueueSnapshotRestoreWrapper<>( + priorityQueue, + metaInfo, + KeyExtractorFunction.forKeyedObjects(), + keyGroupRange, + numberOfKeyGroups); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java index 86a7491..e78751b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java @@ -24,16 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.runtime.state.KeyExtractorFunction; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.Keyed; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.PriorityComparable; -import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; -import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.RestoreOperation; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.StateSerializerProvider; @@ -49,13 +44,11 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import java.io.IOException; import java.io.InputStream; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,11 +68,8 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> { private final Map<String, StateTable<K, ?, ?>> registeredKVStates; private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates; private final CloseableRegistry cancelStreamRegistry; - private final HeapPriorityQueueSetFactory priorityQueueSetFactory; @Nonnull private final KeyGroupRange keyGroupRange; - @Nonnegative private final int numberOfKeyGroups; - private final StateTableFactory<K> stateTableFactory; - private final InternalKeyContext<K> keyContext; + private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation; HeapRestoreOperation( @Nonnull Collection<KeyedStateHandle> restoreStateHandles, @@ -91,7 +81,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> { HeapPriorityQueueSetFactory priorityQueueSetFactory, @Nonnull KeyGroupRange keyGroupRange, int numberOfKeyGroups, - StateTableFactory<K> snapshotStrategy, + StateTableFactory<K> stateTableFactory, InternalKeyContext<K> keyContext) { this.restoreStateHandles = restoreStateHandles; this.keySerializerProvider = keySerializerProvider; @@ -99,11 +89,15 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> { this.registeredKVStates = registeredKVStates; this.registeredPQStates = registeredPQStates; this.cancelStreamRegistry = cancelStreamRegistry; - this.priorityQueueSetFactory = priorityQueueSetFactory; this.keyGroupRange = keyGroupRange; - this.numberOfKeyGroups = numberOfKeyGroups; - this.stateTableFactory = snapshotStrategy; - this.keyContext = keyContext; + this.heapMetaInfoRestoreOperation = + new HeapMetaInfoRestoreOperation<>( + keySerializerProvider, + priorityQueueSetFactory, + keyGroupRange, + numberOfKeyGroups, + stateTableFactory, + keyContext); } @Override @@ -165,9 +159,9 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> { List<StateMetaInfoSnapshot> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); - final Map<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<>(); - - createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById); + final Map<Integer, StateMetaInfoSnapshot> kvStatesById = + this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo( + restoredMetaInfos, registeredKVStates, registeredPQStates); readStateHandleStateData( fsDataInputStream, @@ -187,68 +181,6 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> { return null; } - private void createOrCheckStateForMetaInfo( - List<StateMetaInfoSnapshot> restoredMetaInfo, - Map<Integer, StateMetaInfoSnapshot> kvStatesById) { - - for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) { - final StateSnapshotRestore registeredState; - - switch (metaInfoSnapshot.getBackendStateType()) { - case KEY_VALUE: - registeredState = registeredKVStates.get(metaInfoSnapshot.getName()); - if (registeredState == null) { - RegisteredKeyValueStateBackendMetaInfo<?, ?> - registeredKeyedBackendStateMetaInfo = - new RegisteredKeyValueStateBackendMetaInfo<>( - metaInfoSnapshot); - registeredKVStates.put( - metaInfoSnapshot.getName(), - stateTableFactory.newStateTable( - keyContext, - registeredKeyedBackendStateMetaInfo, - keySerializerProvider.currentSchemaSerializer())); - } - break; - case PRIORITY_QUEUE: - registeredState = registeredPQStates.get(metaInfoSnapshot.getName()); - if (registeredState == null) { - createInternal( - new RegisteredPriorityQueueStateBackendMetaInfo<>( - metaInfoSnapshot)); - } - break; - default: - throw new IllegalStateException( - "Unexpected state type: " - + metaInfoSnapshot.getBackendStateType() - + "."); - } - - // always put metaInfo into kvStatesById, because kvStatesById is KeyGroupsStateHandle - // related - kvStatesById.put(kvStatesById.size(), metaInfoSnapshot); - } - } - - private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> void createInternal( - RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) { - - final String stateName = metaInfo.getName(); - final HeapPriorityQueueSet<T> priorityQueue = - priorityQueueSetFactory.create(stateName, metaInfo.getElementSerializer()); - - HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper = - new HeapPriorityQueueSnapshotRestoreWrapper<>( - priorityQueue, - metaInfo, - KeyExtractorFunction.forKeyedObjects(), - keyGroupRange, - numberOfKeyGroups); - - registeredPQStates.put(stateName, wrapper); - } - private void readStateHandleStateData( FSDataInputStream fsDataInputStream, DataInputViewStreamWrapper inView, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java new file mode 100644 index 0000000..13fdf8f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSavepointRestoreOperation.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.ListDelimitedSerializer; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RestoreOperation; +import org.apache.flink.runtime.state.StateSerializerProvider; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation; +import org.apache.flink.runtime.state.restore.KeyGroup; +import org.apache.flink.runtime.state.restore.KeyGroupEntry; +import org.apache.flink.runtime.state.restore.SavepointRestoreResult; +import org.apache.flink.runtime.state.restore.ThrowingIterator; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix; +import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readKey; +import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readKeyGroup; +import static org.apache.flink.runtime.state.CompositeKeySerializationUtils.readNamespace; + +/** + * Implementation of heap savepoint restore operation. Savepoint shares a common unified binary + * format across all state backends. + * + * @param <K> The data type that the serializer serializes. + */ +public class HeapSavepointRestoreOperation<K> implements RestoreOperation<Void> { + private final int keyGroupPrefixBytes; + private final StateSerializerProvider<K> keySerializerProvider; + private final Map<String, StateTable<K, ?, ?>> registeredKVStates; + private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates; + private final FullSnapshotRestoreOperation<K> savepointRestoreOperation; + private final HeapMetaInfoRestoreOperation<K> heapMetaInfoRestoreOperation; + /* + Shared wrappers for deserializing an entry in the state handle. An optimization + to reduce the number of objects created. + */ + private final DataInputDeserializer entryKeyDeserializer; + private final DataInputDeserializer entryValueDeserializer; + private final ListDelimitedSerializer listDelimitedSerializer; + + HeapSavepointRestoreOperation( + @Nonnull Collection<KeyedStateHandle> restoreStateHandles, + StateSerializerProvider<K> keySerializerProvider, + ClassLoader userCodeClassLoader, + Map<String, StateTable<K, ?, ?>> registeredKVStates, + Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, + HeapPriorityQueueSetFactory priorityQueueSetFactory, + @Nonnull KeyGroupRange keyGroupRange, + int numberOfKeyGroups, + StateTableFactory<K> stateTableFactory, + InternalKeyContext<K> keyContext) { + this.keySerializerProvider = keySerializerProvider; + this.registeredKVStates = registeredKVStates; + this.registeredPQStates = registeredPQStates; + this.savepointRestoreOperation = + new FullSnapshotRestoreOperation<>( + keyGroupRange, + userCodeClassLoader, + restoreStateHandles, + keySerializerProvider); + this.keyGroupPrefixBytes = computeRequiredBytesInKeyGroupPrefix(numberOfKeyGroups); + this.heapMetaInfoRestoreOperation = + new HeapMetaInfoRestoreOperation<>( + keySerializerProvider, + priorityQueueSetFactory, + keyGroupRange, + numberOfKeyGroups, + stateTableFactory, + keyContext); + this.entryKeyDeserializer = new DataInputDeserializer(); + this.entryValueDeserializer = new DataInputDeserializer(); + this.listDelimitedSerializer = new ListDelimitedSerializer(); + } + + @Override + public Void restore() throws Exception { + + registeredKVStates.clear(); + registeredPQStates.clear(); + + try (ThrowingIterator<SavepointRestoreResult> restore = + this.savepointRestoreOperation.restore()) { + while (restore.hasNext()) { + SavepointRestoreResult restoreResult = restore.next(); + List<StateMetaInfoSnapshot> restoredMetaInfos = + restoreResult.getStateMetaInfoSnapshots(); + + final Map<Integer, StateMetaInfoSnapshot> kvStatesById = + this.heapMetaInfoRestoreOperation.createOrCheckStateForMetaInfo( + restoredMetaInfos, registeredKVStates, registeredPQStates); + + try (ThrowingIterator<KeyGroup> keyGroups = restoreResult.getRestoredKeyGroups()) { + while (keyGroups.hasNext()) { + readKeyGroupStateData( + keyGroups.next(), + keySerializerProvider.previousSchemaSerializer(), + kvStatesById); + } + } + } + } + + return null; + } + + private void readKeyGroupStateData( + KeyGroup keyGroup, + TypeSerializer<K> keySerializer, + Map<Integer, StateMetaInfoSnapshot> kvStatesById) + throws Exception { + + try (ThrowingIterator<KeyGroupEntry> entries = keyGroup.getKeyGroupEntries()) { + while (entries.hasNext()) { + KeyGroupEntry groupEntry = entries.next(); + StateMetaInfoSnapshot infoSnapshot = kvStatesById.get(groupEntry.getKvStateId()); + switch (infoSnapshot.getBackendStateType()) { + case KEY_VALUE: + readKVStateData(keySerializer, groupEntry, infoSnapshot); + break; + case PRIORITY_QUEUE: + readPriorityQueue(groupEntry, infoSnapshot); + break; + case OPERATOR: + case BROADCAST: + throw new IllegalStateException( + "Expected only keyed state. Received: " + + infoSnapshot.getBackendStateType()); + } + } + } + } + + @SuppressWarnings("unchecked") + private void readPriorityQueue(KeyGroupEntry groupEntry, StateMetaInfoSnapshot infoSnapshot) + throws IOException { + DataInputDeserializer keyDeserializer = new DataInputDeserializer(groupEntry.getKey()); + keyDeserializer.skipBytesToRead(keyGroupPrefixBytes); + HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement> + priorityQueueSnapshotRestoreWrapper = + (HeapPriorityQueueSnapshotRestoreWrapper<HeapPriorityQueueElement>) + registeredPQStates.get(infoSnapshot.getName()); + HeapPriorityQueueElement timer = + priorityQueueSnapshotRestoreWrapper + .getMetaInfo() + .getElementSerializer() + .deserialize(keyDeserializer); + HeapPriorityQueueSet<HeapPriorityQueueElement> priorityQueue = + priorityQueueSnapshotRestoreWrapper.getPriorityQueue(); + priorityQueue.add(timer); + } + + @SuppressWarnings("unchecked") + private void readKVStateData( + TypeSerializer<K> keySerializer, + KeyGroupEntry groupEntry, + StateMetaInfoSnapshot infoSnapshot) + throws IOException { + StateTable<K, Object, Object> stateTable = + (StateTable<K, Object, Object>) registeredKVStates.get(infoSnapshot.getName()); + RegisteredKeyValueStateBackendMetaInfo<?, ?> metaInfo = stateTable.getMetaInfo(); + TypeSerializer<?> namespaceSerializer = metaInfo.getPreviousNamespaceSerializer(); + TypeSerializer<?> stateSerializer = metaInfo.getPreviousStateSerializer(); + boolean isAmbigousKey = + keySerializer.getLength() < 0 && namespaceSerializer.getLength() < 0; + entryKeyDeserializer.setBuffer(groupEntry.getKey()); + entryValueDeserializer.setBuffer(groupEntry.getValue()); + int keyGroup = readKeyGroup(keyGroupPrefixBytes, entryKeyDeserializer); + K key = readKey(keySerializer, entryKeyDeserializer, isAmbigousKey); + Object namespace = readNamespace(namespaceSerializer, entryKeyDeserializer, isAmbigousKey); + switch (metaInfo.getStateType()) { + case LIST: + stateTable.put( + key, + keyGroup, + namespace, + listDelimitedSerializer.deserializeList( + groupEntry.getValue(), + ((ListSerializer<?>) stateSerializer).getElementSerializer())); + break; + case VALUE: + case REDUCING: + case FOLDING: + case AGGREGATING: + stateTable.put( + key, + keyGroup, + namespace, + stateSerializer.deserialize(entryValueDeserializer)); + break; + case MAP: + deserializeMapStateEntry( + (StateTable<K, Object, Map<Object, Object>>) + (StateTable<K, ?, ?>) stateTable, + keyGroup, + key, + namespace, + (MapSerializer<Object, Object>) stateSerializer); + break; + default: + throw new IllegalStateException("Unknown state type: " + metaInfo.getStateType()); + } + } + + private void deserializeMapStateEntry( + StateTable<K, Object, Map<Object, Object>> stateTable, + int keyGroup, + K key, + Object namespace, + MapSerializer<Object, Object> stateSerializer) + throws IOException { + Object mapEntryKey = stateSerializer.getKeySerializer().deserialize(entryKeyDeserializer); + boolean isNull = entryValueDeserializer.readBoolean(); + final Object mapEntryValue; + if (isNull) { + mapEntryValue = null; + } else { + mapEntryValue = + stateSerializer.getValueSerializer().deserialize(entryValueDeserializer); + } + + Map<Object, Object> userMap = stateTable.get(key, namespace); + if (userMap == null) { + userMap = new HashMap<>(); + stateTable.put(key, keyGroup, namespace, userMap); + } + userMap.put(mapEntryKey, mapEntryValue); + } +}