IGNITE-1309: Moved platform affinity to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/536af49b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/536af49b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/536af49b Branch: refs/heads/ignite-884 Commit: 536af49ba0a407fdbc5682f73a48aa07fa3daae0 Parents: d9a1397 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Aug 26 17:03:39 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Aug 26 17:03:39 2015 +0300 ---------------------------------------------------------------------- .../cache/affinity/PlatformAffinity.java | 293 +++++++++++++++++++ 1 file changed, 293 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/536af49b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java new file mode 100644 index 0000000..d6dfcdb --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Native cache wrapper implementation. + */ +@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"}) +public class PlatformAffinity extends PlatformAbstractTarget { + /** */ + public static final int OP_AFFINITY_KEY = 1; + + /** */ + public static final int OP_ALL_PARTITIONS = 2; + + /** */ + public static final int OP_BACKUP_PARTITIONS = 3; + + /** */ + public static final int OP_IS_BACKUP = 4; + + /** */ + public static final int OP_IS_PRIMARY = 5; + + /** */ + public static final int OP_IS_PRIMARY_OR_BACKUP = 6; + + /** */ + public static final int OP_MAP_KEY_TO_NODE = 7; + + /** */ + public static final int OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS = 8; + + /** */ + public static final int OP_MAP_KEYS_TO_NODES = 9; + + /** */ + public static final int OP_MAP_PARTITION_TO_NODE = 10; + + /** */ + public static final int OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS = 11; + + /** */ + public static final int OP_MAP_PARTITIONS_TO_NODES = 12; + + /** */ + public static final int OP_PARTITION = 13; + + /** */ + public static final int OP_PRIMARY_PARTITIONS = 14; + + /** */ + private static final C1<ClusterNode, UUID> TO_NODE_ID = new C1<ClusterNode, UUID>() { + @Nullable @Override public UUID apply(ClusterNode node) { + return node != null ? node.id() : null; + } + }; + + /** Underlying cache affinity. */ + private final Affinity<Object> aff; + + /** Discovery manager */ + private final GridDiscoveryManager discovery; + + /** + * Constructor. + * + * @param platformCtx Context. + * @param igniteCtx Ignite context. + * @param name Cache name. + */ + public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx, @Nullable String name) + throws IgniteCheckedException { + super(platformCtx); + + this.aff = igniteCtx.grid().affinity(name); + + if (aff == null) + throw new IgniteCheckedException("Cache with the given name doesn't exist: " + name); + + discovery = igniteCtx.discovery(); + } + + /** {@inheritDoc} */ + @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException { + switch (type) { + case OP_PARTITION: + return aff.partition(reader.readObjectDetached()); + + case OP_IS_PRIMARY: { + UUID nodeId = reader.readUuid(); + + Object key = reader.readObjectDetached(); + + ClusterNode node = discovery.node(nodeId); + + if (node == null) + return FALSE; + + return aff.isPrimary(node, key) ? TRUE : FALSE; + } + + case OP_IS_BACKUP: { + UUID nodeId = reader.readUuid(); + + Object key = reader.readObjectDetached(); + + ClusterNode node = discovery.node(nodeId); + + if (node == null) + return FALSE; + + return aff.isBackup(node, key) ? TRUE : FALSE; + } + + case OP_IS_PRIMARY_OR_BACKUP: { + UUID nodeId = reader.readUuid(); + + Object key = reader.readObjectDetached(); + + ClusterNode node = discovery.node(nodeId); + + if (node == null) + return FALSE; + + return aff.isPrimaryOrBackup(node, key) ? TRUE : FALSE; + } + + default: + return throwUnsupported(type); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) + @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer, + Object arg) throws IgniteCheckedException { + switch (type) { + case OP_PRIMARY_PARTITIONS: { + UUID nodeId = reader.readObject(); + + ClusterNode node = discovery.node(nodeId); + + int[] parts = node != null ? aff.primaryPartitions(node) : U.EMPTY_INTS; + + writer.writeIntArray(parts); + + break; + } + + case OP_BACKUP_PARTITIONS: { + UUID nodeId = reader.readObject(); + + ClusterNode node = discovery.node(nodeId); + + int[] parts = node != null ? aff.backupPartitions(node) : U.EMPTY_INTS; + + writer.writeIntArray(parts); + + break; + } + + case OP_ALL_PARTITIONS: { + UUID nodeId = reader.readObject(); + + ClusterNode node = discovery.node(nodeId); + + int[] parts = node != null ? aff.allPartitions(node) : U.EMPTY_INTS; + + writer.writeIntArray(parts); + + break; + } + + case OP_AFFINITY_KEY: { + Object key = reader.readObjectDetached(); + + writer.writeObject(aff.affinityKey(key)); + + break; + } + + case OP_MAP_KEY_TO_NODE: { + Object key = reader.readObjectDetached(); + + ClusterNode node = aff.mapKeyToNode(key); + + platformCtx.writeNode(writer, node); + + break; + } + + case OP_MAP_PARTITION_TO_NODE: { + int part = reader.readObject(); + + ClusterNode node = aff.mapPartitionToNode(part); + + platformCtx.writeNode(writer, node); + + break; + } + + case OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS: { + Object key = reader.readObjectDetached(); + + platformCtx.writeNodes(writer, aff.mapKeyToPrimaryAndBackups(key)); + + break; + } + + case OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS: { + int part = reader.readObject(); + + platformCtx.writeNodes(writer, aff.mapPartitionToPrimaryAndBackups(part)); + + break; + } + + case OP_MAP_KEYS_TO_NODES: { + Collection<Object> keys = reader.readCollection(); + + Map<ClusterNode, Collection<Object>> map = aff.mapKeysToNodes(keys); + + writer.writeInt(map.size()); + + for (Map.Entry<ClusterNode, Collection<Object>> e : map.entrySet()) { + platformCtx.addNode(e.getKey()); + + writer.writeUuid(e.getKey().id()); + writer.writeObject(e.getValue()); + } + + break; + } + + case OP_MAP_PARTITIONS_TO_NODES: { + Collection<Integer> parts = reader.readCollection(); + + Map<Integer, ClusterNode> map = aff.mapPartitionsToNodes(parts); + + writer.writeInt(map.size()); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) { + platformCtx.addNode(e.getValue()); + + writer.writeInt(e.getKey()); + + writer.writeUuid(e.getValue().id()); + } + + break; + } + + default: + throwUnsupported(type); + } + } + + /** + * @return Gets number of partitions in cache. + */ + public int partitions() { + return aff.partitions(); + } +}