Repository: ignite Updated Branches: refs/heads/master 848bb6631 -> e2ac66678
IGNITE-4439 - Attribute based node filter Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2591c160 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2591c160 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2591c160 Branch: refs/heads/master Commit: 2591c160efc4251cb33854955970c93ec20d6b24 Parents: 7094c0f Author: Valentin Kulichenko <valentin.luliche...@gmail.com> Authored: Thu Dec 22 13:05:35 2016 -0800 Committer: Valentin Kulichenko <valentin.luliche...@gmail.com> Committed: Thu Dec 22 13:05:35 2016 -0800 ---------------------------------------------------------------------- .../apache/ignite/util/AttributeNodeFilter.java | 105 +++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 3 + .../util/AttributeNodeFilterSelfTest.java | 184 +++++++++++++++++++ 3 files changed, 292 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2591c160/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java b/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java new file mode 100644 index 0000000..e2b972b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/util/AttributeNodeFilter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.util.Collections; +import java.util.Map; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.services.ServiceConfiguration; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of {@code IgnitePredicate<ClusterNode>} based on + * {@link IgniteConfiguration#getUserAttributes() user attributes}. + * This filter can be used in methods like {@link ClusterGroup#forPredicate(IgnitePredicate)}, + * {@link CacheConfiguration#setNodeFilter(IgnitePredicate)}, + * {@link ServiceConfiguration#setNodeFilter(IgnitePredicate)}, etc. + * <p> + * The filter will evaluate to true if a node has <b>all</b> provided attributes set to + * corresponding values. Here is an example of how you can configure node filter for a + * cache or a service so that it's deployed only on nodes that have {@code group} + * attribute set to value {@code data}: + * <pre name="code" class="xml"> + * <property name="nodeFilter"> + * <bean class="org.apache.ignite.util.ClusterAttributeNodeFilter"> + * <constructor-arg value="group"/> + * <constructor-arg value="data"/> + * </bean> + * </property> + * </pre> + * You can also specify multiple attributes for the filter: + * <pre name="code" class="xml"> + * <property name="nodeFilter"> + * <bean class="org.apache.ignite.util.ClusterAttributeNodeFilter"> + * <constructor-arg> + * <map> + * <entry key="cpu-group" value="high"/> + * <entry key="memory-group" value="high"/> + * </map> + * </constructor-arg> + * </bean> + * </property> + * </pre> + * With this configuration a cache or a service will deploy only on nodes that have both + * {@code cpu-group} and {@code memory-group} attributes set to value {@code high}. + */ +public class AttributeNodeFilter implements IgnitePredicate<ClusterNode> { + /** Attributes. */ + private final Map<String, Object> attrs; + + /** + * Creates new node filter with a single attribute value. + * + * @param attrName Attribute name. + * @param attrVal Attribute value. + */ + public AttributeNodeFilter(String attrName, @Nullable Object attrVal) { + A.notNull(attrName, "attrName"); + + attrs = Collections.singletonMap(attrName, attrVal); + } + + /** + * Creates new node filter with a set of attributes. + * + * @param attrs Attributes. + */ + public AttributeNodeFilter(Map<String, Object> attrs) { + A.notNull(attrs, "attrs"); + + this.attrs = attrs; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + Map<String, Object> nodeAttrs = node.attributes(); + + for (Map.Entry<String, Object> attr : attrs.entrySet()) { + if (!F.eq(nodeAttrs.get(attr.getKey()), attr.getValue())) + return false; + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2591c160/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index c6281df..8ccec34 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import java.util.Set; import junit.framework.TestSuite; import org.apache.ignite.GridSuppressedExceptionSelfTest; +import org.apache.ignite.util.AttributeNodeFilterSelfTest; import org.apache.ignite.internal.ClusterGroupHostsSelfTest; import org.apache.ignite.internal.ClusterGroupSelfTest; import org.apache.ignite.internal.GridFailFastNodeFailureDetectionSelfTest; @@ -149,6 +150,8 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(SecurityPermissionSetBuilderTest.class); + suite.addTestSuite(AttributeNodeFilterSelfTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2591c160/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java new file mode 100644 index 0000000..ac3800f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/util/AttributeNodeFilterSelfTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Collections; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests for {@link AttributeNodeFilter}. + */ +public class AttributeNodeFilterSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private Map<String, ?> attrs; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + if (attrs != null) + cfg.setUserAttributes(attrs); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + attrs = null; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleAttribute() throws Exception { + IgnitePredicate<ClusterNode> filter = new AttributeNodeFilter("attr", "value"); + + assertTrue(filter.apply(nodeProxy(F.asMap("attr", "value")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr", "wrong")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr", null)))); + assertFalse(filter.apply(nodeProxy(Collections.<String, Object>emptyMap()))); + assertFalse(filter.apply(nodeProxy(F.asMap("wrong", "value")))); + assertFalse(filter.apply(nodeProxy(F.asMap("null", "value")))); + } + + /** + * @throws Exception If failed. + */ + public void testSingleAttributeNullValue() throws Exception { + IgnitePredicate<ClusterNode> filter = new AttributeNodeFilter("attr", null); + + assertTrue(filter.apply(nodeProxy(F.asMap("attr", null)))); + assertTrue(filter.apply(nodeProxy(Collections.<String, Object>emptyMap()))); + assertTrue(filter.apply(nodeProxy(F.asMap("wrong", "value")))); + assertTrue(filter.apply(nodeProxy(F.asMap("wrong", null)))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr", "value")))); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleAttributes() throws Exception { + IgnitePredicate<ClusterNode> filter = + new AttributeNodeFilter(F.<String, Object>asMap("attr1", "value1", "attr2", "value2")); + + assertTrue(filter.apply(nodeProxy(F.asMap("attr1", "value1", "attr2", "value2")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "wrong", "attr2", "value2")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1", "attr2", "wrong")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "wrong", "attr2", "wrong")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr2", "value2")))); + assertFalse(filter.apply(nodeProxy(Collections.<String, Object>emptyMap()))); + } + + /** + * @throws Exception If failed. + */ + public void testMultipleAttributesNullValues() throws Exception { + IgnitePredicate<ClusterNode> filter = new AttributeNodeFilter(F.asMap("attr1", null, "attr2", null)); + + assertTrue(filter.apply(nodeProxy(F.asMap("attr1", null, "attr2", null)))); + assertTrue(filter.apply(nodeProxy(F.asMap("attr1", null)))); + assertTrue(filter.apply(nodeProxy(F.asMap("attr2", null)))); + assertTrue(filter.apply(nodeProxy(Collections.<String, Object>emptyMap()))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr2", "value2")))); + assertFalse(filter.apply(nodeProxy(F.asMap("attr1", "value1", "attr2", "value2")))); + } + + /** + * @throws Exception If failed. + */ + public void testClusterGroup() throws Exception { + Ignite group1 = startGridsMultiThreaded(3); + + attrs = F.asMap("group", "data"); + + Ignite group2 = startGridsMultiThreaded(3, 2); + + assertEquals(2, group1.cluster().forPredicate(new AttributeNodeFilter("group", "data")).nodes().size()); + assertEquals(2, group2.cluster().forPredicate(new AttributeNodeFilter("group", "data")).nodes().size()); + + assertEquals(3, group1.cluster().forPredicate(new AttributeNodeFilter("group", null)).nodes().size()); + assertEquals(3, group2.cluster().forPredicate(new AttributeNodeFilter("group", null)).nodes().size()); + + assertEquals(0, group1.cluster().forPredicate(new AttributeNodeFilter("group", "wrong")).nodes().size()); + assertEquals(0, group2.cluster().forPredicate(new AttributeNodeFilter("group", "wrong")).nodes().size()); + } + + /** + * @throws Exception If failed. + */ + public void testCacheFilter() throws Exception { + Ignite group1 = startGridsMultiThreaded(3); + + attrs = F.asMap("group", "data"); + + Ignite group2 = startGridsMultiThreaded(3, 2); + + group1.createCache(new CacheConfiguration<>("test-cache"). + setNodeFilter(new AttributeNodeFilter("group", "data"))); + + assertEquals(2, group1.cluster().forDataNodes("test-cache").nodes().size()); + assertEquals(2, group2.cluster().forDataNodes("test-cache").nodes().size()); + + assertEquals(0, group1.cluster().forDataNodes("wrong").nodes().size()); + assertEquals(0, group2.cluster().forDataNodes("wrong").nodes().size()); + } + + /** + * @param attrs Attributes. + * @return Node proxy. + */ + private static ClusterNode nodeProxy(final Map<String, ?> attrs) { + return (ClusterNode)Proxy.newProxyInstance( + ClusterNode.class.getClassLoader(), + new Class[] { ClusterNode.class }, + new InvocationHandler() { + @SuppressWarnings("SuspiciousMethodCalls") + @Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable { + if ("attributes".equals(mtd.getName())) + return attrs; + + throw new UnsupportedOperationException(); + } + }); + } +}