This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 552d887 IGNITE-14110 Networking module basic API and implementation - #53 552d887 is described below commit 552d887bee5faad3fb26644fd3c75b3bb57b00b2 Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Thu Feb 25 18:20:24 2021 +0300 IGNITE-14110 Networking module basic API and implementation - #53 Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- modules/network/pom.xml | 99 +++++++++++++++++++ .../ITScaleCubeNetworkClusterMessagingTest.java | 97 ++++++++++++++++++ .../ignite/network/scalecube/TestMessage.java | 55 +++++++++++ .../scalecube/TestNetworkHandlersProvider.java | 63 ++++++++++++ .../ignite/network/MessageHandlerHolder.java | 59 +++++++++++ .../org/apache/ignite/network/NetworkCluster.java | 70 +++++++++++++ .../ignite/network/NetworkClusterEventHandler.java | 37 +++++++ .../ignite/network/NetworkClusterFactory.java | 75 ++++++++++++++ .../ignite/network/NetworkHandlersProvider.java | 36 +++++++ .../org/apache/ignite/network/NetworkMember.java | 63 ++++++++++++ .../org/apache/ignite/network/NetworkMessage.java | 59 +++++++++++ .../ignite/network/NetworkMessageHandler.java | 27 +++++ .../network/scalecube/ScaleCubeMemberResolver.java | 64 ++++++++++++ .../network/scalecube/ScaleCubeMessageHandler.java | 100 +++++++++++++++++++ .../network/scalecube/ScaleCubeNetworkCluster.java | 110 +++++++++++++++++++++ parent/pom.xml | 7 ++ pom.xml | 1 + 17 files changed, 1022 insertions(+) diff --git a/modules/network/pom.xml b/modules/network/pom.xml new file mode 100644 index 0000000..bd8407e --- /dev/null +++ b/modules/network/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> + + <artifactId>ignite-network</artifactId> + <version>3.0.0-SNAPSHOT</version> + + <dependencies> + <!-- Internal module dependencies. --> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-configuration</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- 3-rd party dependencies. --> + <dependency> + <groupId>io.scalecube</groupId> + <artifactId>scalecube-cluster</artifactId> + </dependency> + + <!-- Test dependencies. --> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-configuration-annotation-processor</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.8.1</version> + </plugin> + </plugins> + </build> +</project> diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java new file mode 100644 index 0000000..1f571d0 --- /dev/null +++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkClusterMessagingTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network.scalecube; + +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.ignite.network.NetworkCluster; +import org.apache.ignite.network.NetworkMember; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.MessageHandlerHolder; +import org.apache.ignite.network.NetworkClusterFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** */ +class ITScaleCubeNetworkClusterMessagingTest { + /** */ + private final Queue<NetworkCluster> startedMembers = new ConcurrentLinkedQueue<>(); + + /** */ + @AfterEach + public void afterEach() throws Exception { + Iterator<NetworkCluster> iterator = startedMembers.iterator(); + + while (iterator.hasNext()) { + iterator.next().shutdown(); + + iterator.remove(); + } + } + + /** */ + @Test + public void messageWasSentToAllMembersSuccessfully() { + //Given: Three started member which are gathered to cluster. + List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346"); + + NetworkCluster alice = startMember("Alice", 3344, addresses); + NetworkCluster bob = startMember("Bob", 3345, addresses); + NetworkCluster carol = startMember("Carol", 3346, addresses); + + TestMessage sentMessage = new TestMessage("Message from Alice"); + + //When: Send one message to all members in cluster. + for (NetworkMember member : alice.allMembers()) { + System.out.println("SEND : " + member); + + alice.weakSend(member, sentMessage); + } + + //Then: All members successfully received message. + assertThat(getLastMessage(alice).data(), is(sentMessage)); + assertThat(getLastMessage(bob).data(), is(sentMessage)); + assertThat(getLastMessage(carol).data(), is(sentMessage)); + } + + /** */ + private NetworkMessage getLastMessage(NetworkCluster alice) { + return TestNetworkHandlersProvider.MESSAGE_STORAGE.get(alice.localMember().name()); + } + + /** + * @return Started member. + */ + private NetworkCluster startMember(String name, int port, List<String> addresses) { + NetworkCluster member = new NetworkClusterFactory(name, port, addresses) + .startScaleCubeBasedCluster(new ScaleCubeMemberResolver(), new MessageHandlerHolder()); + + member.addHandlersProvider(new TestNetworkHandlersProvider(name)); + + System.out.println("-----" + name + " started"); + + startedMembers.add(member); + + return member; + } + +} \ No newline at end of file diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java new file mode 100644 index 0000000..8420fea --- /dev/null +++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.ignite.network.scalecube; + +import java.io.Serializable; +import java.util.Objects; + +/** */ +class TestMessage implements Serializable { + /** */ + private final String msg; + + /** */ + public TestMessage(String msg) { + this.msg = msg; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TestMessage message = (TestMessage)o; + return Objects.equals(msg, message.msg); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(msg); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "TestMessage{" + + "msg='" + msg + '\'' + + '}'; + } +} diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java new file mode 100644 index 0000000..6f5718a --- /dev/null +++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestNetworkHandlersProvider.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.ignite.network.scalecube; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.network.NetworkClusterEventHandler; +import org.apache.ignite.network.NetworkHandlersProvider; +import org.apache.ignite.network.NetworkMember; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; + +/** */ +class TestNetworkHandlersProvider implements NetworkHandlersProvider { + /** */ + public static Map<String, NetworkMessage> MESSAGE_STORAGE = new ConcurrentHashMap<>(); + + /** */ + private final String localName; + + /** */ + public TestNetworkHandlersProvider(String name) { + localName = name; + } + + /** {@inheritDoc} */ + @Override public NetworkMessageHandler messageHandler() { + return event -> { + MESSAGE_STORAGE.put(localName, event); + + System.out.println(localName + " handled messages : " + event); + }; + } + + /** {@inheritDoc} */ + @Override public NetworkClusterEventHandler clusterEventHandler() { + return new NetworkClusterEventHandler() { + @Override public void onAppeared(NetworkMember member) { + System.out.println(localName + " found member : " + member); + } + + @Override public void onDisappeared(NetworkMember member) { + System.out.println(localName + " lost member : " + member); + } + }; + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java b/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java new file mode 100644 index 0000000..dbe0e27 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/MessageHandlerHolder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * Encapsulation of all cluster handlers for centralized management. + */ +public class MessageHandlerHolder { + /** Handler for processing incoming messages. */ + private final Collection<NetworkMessageHandler> messageHandlers = new CopyOnWriteArrayList<>(); + + /** Handler for processing all cluster events. */ + private final Collection<NetworkClusterEventHandler> clusterEventHandlers = new CopyOnWriteArrayList<>(); + + /** + * @param handler Handler for processing incoming messages. + */ + public void addmessageHandlers(NetworkMessageHandler handler) { + messageHandlers.add(handler); + } + + /** + * @param handler Handler for processing all cluster events. + */ + public void addClusterEventHandlers(NetworkClusterEventHandler handler) { + clusterEventHandlers.add(handler); + } + + /** + * @return All handlers for processing incoming messages. + */ + public Collection<NetworkMessageHandler> messageHandlers() { + return messageHandlers; + } + + /** + * @return All handler for processing all cluster events. + */ + public Collection<NetworkClusterEventHandler> clusterEventHandlers() { + return clusterEventHandlers; + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java new file mode 100644 index 0000000..5bdd576 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +import java.util.Collection; +import java.util.concurrent.Future; + +/** + * Main interface for interaction with network. It allows to get information about network members and send messages to + * them. + */ +public interface NetworkCluster { + /** + * Stop the processing of network connection immediately. Sending and receiving messages or obtaining network + * members information after this method successfully finished will be impossible. + * + * @throws Exception If something went wrong. + */ + void shutdown() throws Exception; + + /** + * @return Information about local network member. + */ + NetworkMember localMember(); + + /** + * @return Information about all members which have seen by the local member(including local member itself). + */ + Collection<NetworkMember> allMembers(); + + /** + * Try to send the message asynchronously to the specific member without any guarantees that this message would be + * delivered. + * + * @param member Netwrok member which should receive the message. + * @param msg Message which should be delivered. + */ + void weakSend(NetworkMember member, Object msg); + + /** + * Try to send the message asynchronously to the specific member with next guarantees: + * * Messages which was sent from one thread to one member will be delivered in the same order as they were sent. + * * If message N was successfully delivered to the member that means all messages preceding N also were successfully delivered. + * + * @param member Network member which should receive the message. + * @param msg Message which should be delivered. + */ + Future<?> guaranteedSend(NetworkMember member, Object msg); + + /** + * Add provider which allows to get configured handlers for different cluster events(ex. received message). + * + * @param networkHandlersProvider Provider for obtaining cluster event handlers. + */ + void addHandlersProvider(NetworkHandlersProvider networkHandlersProvider); +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java new file mode 100644 index 0000000..3a11e7e --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterEventHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +/** + * Interface for handling events related to cluster changes. + */ +public interface NetworkClusterEventHandler { + /** + * Event which happened when one new member was detected in cluster. + * + * @param member New network member. + */ + void onAppeared(NetworkMember member); + + /** + * Event which happened when one member leave the cluster. It means the member leaves the cluster permanently. If + * the connection lost but it is possible to reestablish it, nothing happens here. + * + * @param member The network member which leaves the cluster. + */ + void onDisappeared(NetworkMember member); +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java new file mode 100644 index 0000000..6ed50b5 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkClusterFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +import java.util.List; +import java.util.stream.Collectors; +import io.scalecube.cluster.Cluster; +import io.scalecube.cluster.ClusterImpl; +import io.scalecube.net.Address; +import org.apache.ignite.network.scalecube.ScaleCubeMemberResolver; +import org.apache.ignite.network.scalecube.ScaleCubeMessageHandler; +import org.apache.ignite.network.scalecube.ScaleCubeNetworkCluster; + +/** + * Factory of different implementation of {@link NetworkCluster}. + */ +public class NetworkClusterFactory { + /** Unique name of network member. */ + private final String localMemberName; + + /** Local port. */ + private final int localPort; + + /** Network addresses to find another members in cluster. */ + private final List<String> addresses; + + /** + * @param localMemberName Unique name of network member. + * @param port Local port. + * @param addresses Network addresses to find another members in cluster. + */ + public NetworkClusterFactory(String localMemberName, int port, List<String> addresses) { + this.localMemberName = localMemberName; + localPort = port; + this.addresses = addresses; + } + + /** + * Implementation of {@link NetworkCluster} based on ScaleCube. + * + * @param memberResolver Member resolve which allows convert {@link org.apache.ignite.network.NetworkMember} to + * inner ScaleCube type and otherwise. + * @param messageHandlerHolder Holder of all cluster message handlers. + * @return {@link NetworkCluster} instance. + */ + public NetworkCluster startScaleCubeBasedCluster( + ScaleCubeMemberResolver memberResolver, + MessageHandlerHolder messageHandlerHolder + ) { + Cluster cluster = new ClusterImpl() + .handler(cl -> new ScaleCubeMessageHandler(cl, memberResolver, messageHandlerHolder)) + .config(opts -> opts + .memberAlias(localMemberName) + .transport(trans -> trans.port(localPort)) + ) + .membership(opts -> opts.seedMembers(addresses.stream().map(Address::from).collect(Collectors.toList()))) + .startAwait(); + + return new ScaleCubeNetworkCluster(cluster, memberResolver, messageHandlerHolder); + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java new file mode 100644 index 0000000..91b5b75 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkHandlersProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +/** + * Provider of handlers of different cluster events. + */ +public interface NetworkHandlersProvider { + /** + * @return Handler for processing the received messages from the cluster. + */ + default NetworkMessageHandler messageHandler() { + return null; + } + + /** + * @return Handler for processing the different cluster events. + */ + default NetworkClusterEventHandler clusterEventHandler() { + return null; + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java new file mode 100644 index 0000000..9c5e2bc --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMember.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +import java.util.Objects; + +/** + * Representation of the network member. + */ +public class NetworkMember { + /** Unique name of member in cluster. */ + private final String name; + + /** + * @param name Unique name of member in cluster. + */ + public NetworkMember(String name) { + this.name = name; + } + + /** + * @return Unique name of member in cluster. + */ + public String name() { + return name; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + NetworkMember member = (NetworkMember)o; + return Objects.equals(name, member.name); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(name); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "NetworkMember{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java new file mode 100644 index 0000000..c545fc7 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +/** + * Message for exchange information in cluster. + */ +public class NetworkMessage { + /** Custom data. */ + private final Object data; + + /** Network member who sent this message. */ + private final NetworkMember senderMember; + + /** + * @param data Custom data. + * @param senderMember Network member who sent this message. + */ + public NetworkMessage(Object data, NetworkMember senderMember) { + this.data = data; + this.senderMember = senderMember; + } + + /** + * @param <T> Type of message. + * @return Custom data. + */ + public <T> T data() { + return (T)data; + } + + /** + * @return Network member who sent this message. + */ + public NetworkMember sender() { + return senderMember; + } + + @Override public String toString() { + return "NetworkMessage{" + + "data=" + data + + ", senderMember=" + senderMember + + '}'; + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java new file mode 100644 index 0000000..65de5b3 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network; + +/** + * Handler of incoming messages. + */ +public interface NetworkMessageHandler { + /** + * @param message Message which was received from cluster. + */ + void onReceived(NetworkMessage message); +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java new file mode 100644 index 0000000..361e2f8 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMemberResolver.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network.scalecube; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import io.scalecube.cluster.Member; +import org.apache.ignite.network.NetworkMember; + +import static java.util.Objects.requireNonNull; + +/** + * Resolver for scalecube specific member. + */ +public class ScaleCubeMemberResolver { + /** Map of public network member by its unique name. */ + private final Map<String, NetworkMember> directMemberMap = new ConcurrentHashMap<>(); + + /** Map of scalecube member by its public member. */ + private final Map<NetworkMember, Member> reverseMemberMap = new ConcurrentHashMap<>(); + + /** + * Getting the existed member by scalecube member or create new one. + * + * @param member ScaleCube specific member. + * @return Public network member instance. + */ + public NetworkMember resolveNetworkMember(Member member) { + String alias = member.alias(); + + NetworkMember networkMember = directMemberMap.get(alias); + + if (networkMember != null) + return networkMember; + + networkMember = directMemberMap.computeIfAbsent(alias, NetworkMember::new); + + reverseMemberMap.put(networkMember, member); + + return networkMember; + } + + /** + * @param member Public network member. + * @return ScaleCube specific member. + */ + public Member resolveMember(NetworkMember member) { + return requireNonNull(reverseMemberMap.get(member)); + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java new file mode 100644 index 0000000..9a04e66 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageHandler.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network.scalecube; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import io.scalecube.cluster.Cluster; +import io.scalecube.cluster.ClusterMessageHandler; +import io.scalecube.cluster.membership.MembershipEvent; +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.net.Address; +import org.apache.ignite.network.NetworkClusterEventHandler; +import org.apache.ignite.network.NetworkMember; +import org.apache.ignite.network.NetworkMessage; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.MessageHandlerHolder; + +/** + * Integration class for adapting {@link NetworkMessageHandler} and {@link NetworkClusterEventHandler} in terms of + * ScaleCube. + */ +public class ScaleCubeMessageHandler implements ClusterMessageHandler { + /** Instance of scalecube cluster. */ + private final Cluster cluster; + + /** Resolver from/to inner member to/from public one. */ + private final ScaleCubeMemberResolver scaleCubeMemberResolver; + + /** Storage of all handlers for execution. */ + private final MessageHandlerHolder messageHandlerHolder; + + /** Utility map for recognizing member for its address(scalecube doesn't provide such information in input message). */ + private final Map<Address, NetworkMember> addressMemberMap = new ConcurrentHashMap<>(); + + /** + * @param cluster Instance of scalecube cluster. + * @param resolver Resolver from/to inner member to/from public one. + * @param holder Storage of all handlers for execution. + */ + public ScaleCubeMessageHandler( + Cluster cluster, + ScaleCubeMemberResolver resolver, + MessageHandlerHolder holder + ) { + this.cluster = cluster; + scaleCubeMemberResolver = resolver; + messageHandlerHolder = holder; + } + + /** {@inheritDoc} */ + @Override public void onMessage(Message message) { + for (NetworkMessageHandler handler : messageHandlerHolder.messageHandlers()) { + handler.onReceived(new NetworkMessage(message.data(), memberForAddress(message.sender()))); + } + } + + /** + * @param address Inet address. + * @return Network member corresponded to input address. + */ + private NetworkMember memberForAddress(Address address) { + return addressMemberMap.computeIfAbsent(address, + (key) -> cluster + .members().stream() + .filter(mem -> mem.address().equals(address)) + .map(scaleCubeMemberResolver::resolveNetworkMember) + .findFirst() + .orElse(null) + ); + } + + /** {@inheritDoc} */ + @Override public void onMembershipEvent(MembershipEvent event) { + for (NetworkClusterEventHandler lsnr : messageHandlerHolder.clusterEventHandlers()) { + if (event.type() == MembershipEvent.Type.ADDED) + lsnr.onAppeared(scaleCubeMemberResolver.resolveNetworkMember(event.member())); + else if (event.type() == MembershipEvent.Type.LEAVING || event.type() == MembershipEvent.Type.REMOVED) + lsnr.onDisappeared((scaleCubeMemberResolver.resolveNetworkMember(event.member()))); + else if (event.type() == MembershipEvent.Type.UPDATED) { + //do nothing. + } + else + throw new RuntimeException("This event is not supported: event = " + event); + } + } +} diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java new file mode 100644 index 0000000..0dc1087 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkCluster.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.network.scalecube; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import io.scalecube.cluster.Cluster; +import org.apache.ignite.network.NetworkCluster; +import org.apache.ignite.network.NetworkClusterEventHandler; +import org.apache.ignite.network.NetworkHandlersProvider; +import org.apache.ignite.network.NetworkMember; +import org.apache.ignite.network.NetworkMessageHandler; +import org.apache.ignite.network.MessageHandlerHolder; + +import static io.scalecube.cluster.transport.api.Message.fromData; + +/** + * Implementation of {@link NetworkCluster} based on ScaleCube. + */ +public class ScaleCubeNetworkCluster implements NetworkCluster { + /** Inner representation of cluster of scalecube. */ + private final Cluster cluster; + + /** Resolver for scalecube specific member. */ + private final ScaleCubeMemberResolver memberResolver; + + /** Holder of all cluster handlers. */ + private final MessageHandlerHolder messageHandlerHolder; + + /** + * @param cluster Inner representation of cluster of scalecube. + * @param memberResolver Resolver for scalecube specific member. + * @param messageHandlerHolder Holder of all cluster handlers. + */ + public ScaleCubeNetworkCluster( + Cluster cluster, + ScaleCubeMemberResolver memberResolver, + MessageHandlerHolder messageHandlerHolder + ) { + this.messageHandlerHolder = messageHandlerHolder; + this.cluster = cluster; + this.memberResolver = memberResolver; + } + + /** {@inheritDoc} */ + @Override public void shutdown() throws Exception { + cluster.shutdown(); + + cluster.onShutdown().block(); + } + + /** {@inheritDoc} */ + @Override public NetworkMember localMember() { + return memberResolver.resolveNetworkMember(cluster.member()); + } + + /** {@inheritDoc} */ + @Override public Collection<NetworkMember> allMembers() { + return cluster.members().stream() + .map(memberResolver::resolveNetworkMember) + .collect(Collectors.toList()); + } + + /** {@inheritDoc} */ + @Override public void weakSend(NetworkMember member, Object msg) { + cluster.send(memberResolver.resolveMember(member), fromData(msg)) + .block(); + } + + /** {@inheritDoc} */ + @Override public Future<?> guaranteedSend(NetworkMember member, Object msg) { + cluster.send(memberResolver.resolveMember(member), fromData(msg)) + .block(); + + CompletableFuture<Object> future = new CompletableFuture<>(); + + future.complete(null); + + return future; + } + + /** {@inheritDoc} */ + @Override public void addHandlersProvider(NetworkHandlersProvider networkHandlersProvider) { + NetworkClusterEventHandler lsnr = networkHandlersProvider.clusterEventHandler(); + + if (lsnr != null) + messageHandlerHolder.addClusterEventHandlers(lsnr); + + NetworkMessageHandler messageHandler = networkHandlersProvider.messageHandler(); + + if (messageHandler != null) + messageHandlerHolder.addmessageHandlers(messageHandler); + } +} diff --git a/parent/pom.xml b/parent/pom.xml index be18342..573cff7 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -71,6 +71,7 @@ <spoon.framework.version>8.3.0</spoon.framework.version> <typesafe.version>1.4.1</typesafe.version> <hamcrest.version>2.2</hamcrest.version> + <scalecube.version>2.6.6</scalecube.version> <!-- Plugins versions --> <apache.rat.plugin.version>0.13</apache.rat.plugin.version> @@ -241,6 +242,12 @@ <artifactId>hamcrest-library</artifactId> <version>${hamcrest.version}</version> </dependency> + + <dependency> + <groupId>io.scalecube</groupId> + <artifactId>scalecube-cluster</artifactId> + <version>${scalecube.version}</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/pom.xml b/pom.xml index abf5ce6..ade1018 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ <module>modules/configuration-annotation-processor</module> <module>modules/rest</module> <module>modules/runner</module> + <module>modules/network</module> </modules> <build>