Repository: incubator-geode Updated Branches: refs/heads/develop 96c67d4af -> 6dd3a580f
GEODE-1375 When using multicast a new member needs to receive the multicast message digest This reinstates the sending of JoinResponseMessages so that the new member can get the jgroups multicast digest. The JoinResponseMessages are sent after installing the new membership view, so JGroupsMessenger has been changed to use MERGE_VIEW instead of SET_VIEW to install the digest since it may have already received multicast messages from some members. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6dd3a580 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6dd3a580 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6dd3a580 Branch: refs/heads/develop Commit: 6dd3a580ffb68331c9c45b3e311d7f31c4cfca05 Parents: 96c67d4 Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Wed May 11 15:26:08 2016 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Wed May 11 15:27:41 2016 -0700 ---------------------------------------------------------------------- .../membership/gms/membership/GMSJoinLeave.java | 17 +- .../gms/messenger/JGroupsMessenger.java | 10 +- .../membership/gms/messenger/jgroups-mcast.xml | 194 +++++++++---------- .../gms/membership/GMSJoinLeaveJUnitTest.java | 2 +- 4 files changed, 118 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6dd3a580/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 88e4d49..41ec1ea 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -750,6 +750,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return newView; } + private void sendJoinResponses(NetView newView, List<InternalDistributedMember> newMbrs) { + for (InternalDistributedMember mbr : newMbrs) { + JoinResponseMessage response = new JoinResponseMessage(mbr, newView); + services.getMessenger().send(response); + } + } + private void sendRemoveMessages(List<InternalDistributedMember> removals, List<String> reasons, NetView newView, Set<InternalDistributedMember> oldIds) { Iterator<String> reason = reasons.iterator(); for (InternalDistributedMember mbr : removals) { @@ -1128,8 +1135,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { */ private void processJoinResponse(JoinResponseMessage rsp) { synchronized (joinResponse) { - joinResponse[0] = rsp; - joinResponse.notifyAll(); + if (!this.isJoined) { + joinResponse[0] = rsp; + joinResponse.notifyAll(); + } } } @@ -2258,6 +2267,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { sendView(newView, joinReqs); + // we also send a join response so that information like the multicast message digest + // can be transmitted to the new members w/o including it in the view message + sendJoinResponses(newView, joinReqs); + if (markViewCreatorForShutdown && getViewCreator() != null) { shutdown = true; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6dd3a580/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 4a54e84..0460964 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -140,7 +140,7 @@ public class JGroupsMessenger implements Messenger { * reduces the amount of suspect processing initiated by IOExceptions and the * amount of exceptions logged */ - private Set<Address> addressesWithioExceptionsProcessed = Collections.synchronizedSet(new HashSet<Address>()); + private Set<Address> addressesWithIoExceptionsProcessed = Collections.synchronizedSet(new HashSet<Address>()); static { // register classes that we've added to jgroups that are put on the wire @@ -376,7 +376,7 @@ public class JGroupsMessenger implements Messenger { logger.trace("installing JGroups view: {}", jgv); this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv)); - addressesWithioExceptionsProcessed.clear(); + addressesWithIoExceptionsProcessed.clear(); } @@ -390,10 +390,10 @@ public class JGroupsMessenger implements Messenger { if (services.getManager().shutdownInProgress()) { // GEODE-634 - don't log IOExceptions during shutdown return; } - if (addressesWithioExceptionsProcessed.contains(dest)) { + if (addressesWithIoExceptionsProcessed.contains(dest)) { return; } - addressesWithioExceptionsProcessed.add(dest); + addressesWithIoExceptionsProcessed.add(dest); NetView v = this.view; JGAddress jgMbr = (JGAddress)dest; if (jgMbr != null && v != null) { @@ -952,7 +952,7 @@ public class JGroupsMessenger implements Messenger { if (digest != null) { logger.trace("installing JGroups message digest {}", digest); this.myChannel.getProtocolStack() - .getTopProtocol().down(new Event(Event.SET_DIGEST, digest)); + .getTopProtocol().down(new Event(Event.MERGE_DIGEST, digest)); jrsp.setMessengerData(null); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6dd3a580/geode-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml ---------------------------------------------------------------------- diff --git a/geode-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml b/geode-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml index c71f972..69048d9 100755 --- a/geode-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml +++ b/geode-core/src/main/resources/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-mcast.xml @@ -1,98 +1,98 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -<config xmlns="urn:org:jgroups" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"> -<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.Transport - BIND_ADDR_SETTING - bind_port="MEMBERSHIP_PORT_RANGE_START" - port_range="MEMBERSHIP_PORT_RANGE" - - mcast_addr="MCAST_ADDRESS" - mcast_port="MCAST_PORT" - tos="16" - ip_mcast="true" - ip_ttl="MCAST_TTL" - - ucast_recv_buf_size="UDP_RECV_BUFFER_SIZE" - ucast_send_buf_size="UDP_SEND_BUFFER_SIZE" - mcast_recv_buf_size="MCAST_RECV_BUFFER_SIZE" - mcast_send_buf_size="MCAST_SEND_BUFFER_SIZE" - - enable_batching="false" - ignore_dont_bundle="false" - max_bundle_timeout="30" - - thread_naming_pattern="UDP Message Handler" - enable_diagnostics="false" - disable_loopback="false" - - timer_type="new3" - timer.min_threads="1" - timer.max_threads="4" - timer.keep_alive_time="3000" - timer.queue_max_size="500" - - thread_pool.enabled="false" - thread_pool.min_threads="1" - thread_pool.max_threads="4" - thread_pool.keep_alive_time="5000" - thread_pool.queue_enabled="true" - thread_pool.queue_max_size="10000" - thread_pool.rejection_policy="discard" - - oob_thread_pool.enabled="false" - oob_thread_pool.min_threads="1" - oob_thread_pool.max_threads="4" - oob_thread_pool.keep_alive_time="5000" - oob_thread_pool.queue_enabled="false" - oob_thread_pool.queue_max_size="100" - oob_thread_pool.rejection_policy="discard"/> - -<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.AddressManager/> -<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder/> - -<pbcast.NAKACK2 - xmit_interval="MCAST_RETRANSMIT_INTERVAL" - xmit_table_num_rows="100" - xmit_table_msgs_per_row="2000" - xmit_table_max_compaction_time="30000" - max_msg_batch_size="500" - use_mcast_xmit="false" - discard_delivered_msgs="true"/> -<UNICAST3 - xmit_interval="500" - xmit_table_num_rows="100" - xmit_table_msgs_per_row="2000" - xmit_table_max_compaction_time="60000" - conn_expiry_timeout="0" - max_msg_batch_size="500"/> -<pbcast.STABLE - stability_delay="50" - desired_avg_gossip="2000" - max_bytes="400000"/> -<UFC - max_credits="FC_MAX_CREDITS" - min_threshold="FC_THRESHOLD" - max_block_time="FC_MAX_BLOCK"/> -<MFC - max_credits="FC_MAX_CREDITS" - min_threshold="FC_THRESHOLD" - max_block_time="FC_MAX_BLOCK"/> -<FRAG2 - frag_size="UDP_FRAGMENT_SIZE"/> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<config xmlns="urn:org:jgroups" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"> +<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.Transport + BIND_ADDR_SETTING + bind_port="MEMBERSHIP_PORT_RANGE_START" + port_range="MEMBERSHIP_PORT_RANGE" + + mcast_addr="MCAST_ADDRESS" + mcast_port="MCAST_PORT" + tos="16" + ip_mcast="true" + ip_ttl="MCAST_TTL" + + ucast_recv_buf_size="UDP_RECV_BUFFER_SIZE" + ucast_send_buf_size="UDP_SEND_BUFFER_SIZE" + mcast_recv_buf_size="MCAST_RECV_BUFFER_SIZE" + mcast_send_buf_size="MCAST_SEND_BUFFER_SIZE" + + enable_batching="false" + ignore_dont_bundle="false" + max_bundle_timeout="30" + + thread_naming_pattern="UDP Message Handler" + enable_diagnostics="false" + disable_loopback="false" + + timer_type="new3" + timer.min_threads="1" + timer.max_threads="4" + timer.keep_alive_time="3000" + timer.queue_max_size="500" + + thread_pool.enabled="false" + thread_pool.min_threads="1" + thread_pool.max_threads="4" + thread_pool.keep_alive_time="5000" + thread_pool.queue_enabled="true" + thread_pool.queue_max_size="10000" + thread_pool.rejection_policy="discard" + + oob_thread_pool.enabled="false" + oob_thread_pool.min_threads="1" + oob_thread_pool.max_threads="4" + oob_thread_pool.keep_alive_time="5000" + oob_thread_pool.queue_enabled="false" + oob_thread_pool.queue_max_size="100" + oob_thread_pool.rejection_policy="discard"/> + +<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.AddressManager/> +<com.gemstone.gemfire.distributed.internal.membership.gms.messenger.StatRecorder/> + +<pbcast.NAKACK2 + xmit_interval="MCAST_RETRANSMIT_INTERVAL" + xmit_table_num_rows="100" + xmit_table_msgs_per_row="2000" + xmit_table_max_compaction_time="30000" + max_msg_batch_size="500" + use_mcast_xmit="false" + discard_delivered_msgs="true"/> +<UNICAST3 + xmit_interval="500" + xmit_table_num_rows="100" + xmit_table_msgs_per_row="2000" + xmit_table_max_compaction_time="60000" + conn_expiry_timeout="0" + max_msg_batch_size="500"/> +<pbcast.STABLE + stability_delay="50" + desired_avg_gossip="2000" + max_bytes="400000"/> +<UFC + max_credits="FC_MAX_CREDITS" + min_threshold="FC_THRESHOLD" + max_block_time="FC_MAX_BLOCK"/> +<MFC + max_credits="FC_MAX_CREDITS" + min_threshold="FC_THRESHOLD" + max_block_time="FC_MAX_BLOCK"/> +<FRAG2 + frag_size="UDP_FRAGMENT_SIZE"/> </config> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6dd3a580/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java old mode 100644 new mode 100755 index 50bed13..1e1724d --- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java @@ -1128,7 +1128,7 @@ public class GMSJoinLeaveJUnitTest { } Thread.sleep(1000); - System.out.println("Empty sleeps " + sleeps + " stoppping: " + gmsJoinLeave.isStopping() ); + System.out.println("Empty sleeps " + sleeps + " stopping: " + gmsJoinLeave.isStopping() ); } }