http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java new file mode 100644 index 0000000..04abc89 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java @@ -0,0 +1,218 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.internal.tcp.ConnectionException; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; + +/** + * This class represent a runnable task which exchange the locator information + * with local locators(within the site) as well as remote locators (across the + * site) + * + * @author kbachhav + * @since 7.0 + */ +public class LocatorDiscovery{ + + private static final Logger logger = LogService.getLogger(); + + private DistributionLocatorId locatorId; + + private LocatorMembershipListener locatorListener; + + RemoteLocatorJoinRequest request; + + public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT = Integer + .getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue(); + + public static final int WAN_LOCATOR_CONNECTION_INTERVAL = Integer.getInteger( + "WANLocator.CONNECTION_INTERVAL", 10000).intValue(); + + public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger( + "WANLocator.PING_INTERVAL", 10000).intValue(); + + public LocatorDiscovery(DistributionLocatorId locotor,RemoteLocatorJoinRequest request, + LocatorMembershipListener locatorListener) { + this.locatorId = locotor; + this.request = request; + this.locatorListener = locatorListener; + } + + /** + * When a batch fails, then this keeps the last time when a failure was logged + * . We don't want to swamp the logs in retries due to same batch failures. + */ + private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval = new ConcurrentHashMap<DistributionLocatorId, long[]>(); + + /** + * The maximum size of {@link #failureLogInterval} beyond which it will start + * logging all failure instances. Hopefully this should never happen in + * practice. + */ + private static final int FAILURE_MAP_MAXSIZE = Integer.getInteger( + "gemfire.GatewaySender.FAILURE_MAP_MAXSIZE", 1000000); + + /** + * The maximum interval for logging failures of the same event in millis. + */ + private static final int FAILURE_LOG_MAX_INTERVAL = Integer.getInteger( + "gemfire.LocatorDiscovery.FAILURE_LOG_MAX_INTERVAL", 300000); + + public final boolean skipFailureLogging(DistributionLocatorId locatorId) { + boolean skipLogging = false; + if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) { + long[] logInterval = this.failureLogInterval.get(locatorId); + if (logInterval == null) { + logInterval = this.failureLogInterval.putIfAbsent(locatorId, + new long[] { System.currentTimeMillis(), 1000 }); + } + if (logInterval != null) { + long currentTime = System.currentTimeMillis(); + if ((currentTime - logInterval[0]) < logInterval[1]) { + skipLogging = true; + } else { + logInterval[0] = currentTime; + if (logInterval[1] <= (FAILURE_LOG_MAX_INTERVAL / 2)) { + logInterval[1] *= 2; + } + } + } + } + return skipLogging; + } + + + public class LocalLocatorDiscovery implements Runnable { + public void run() { + exchangeLocalLocators(); + } + } + + public class RemoteLocatorDiscovery implements Runnable { + public void run() { + exchangeRemoteLocators(); + } + } + + + private void exchangeLocalLocators() { + int retryAttempt = 1; + while (true) { + try { + RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse)TcpClient + .requestToServer(locatorId.getHost(), locatorId.getPort(), request, + WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + if (response != null) { + LocatorHelper.addExchnagedLocators(response.getLocators(), + this.locatorListener); + logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1, + new Object[] { request.getLocator(), locatorId })); + break; + } + } + catch (IOException ioe) { + if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) { + ConnectionException coe = new ConnectionException( + "Not able to connect to local locator after " + + WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts", + ioe); + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2, + new Object[] { request.getLocator(),locatorId, retryAttempt }), coe); + break; + } + if (skipFailureLogging(locatorId)) { + logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS, + new Object[] { request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL })); + } + try { + Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + retryAttempt++; + continue; + } + catch (ClassNotFoundException cnfe) { + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), cnfe); + break; + } + } + } + + public void exchangeRemoteLocators() { + int retryAttempt = 1; + DistributionLocatorId remoteLocator = this.locatorId; + while (true) { + RemoteLocatorJoinResponse response; + try { + response = (RemoteLocatorJoinResponse)TcpClient + .requestToServer(remoteLocator.getHost(), remoteLocator.getPort(), + request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + if (response != null) { + LocatorHelper.addExchnagedLocators(response.getLocators(), this.locatorListener); + logger.info(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_EXCHANGED_LOCATOR_INFORMATION_0_WITH_1, + new Object[] { request.getLocator(), locatorId })); + RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest( + ""); + while (true) { + Thread.sleep(WAN_LOCATOR_PING_INTERVAL); + RemoteLocatorPingResponse pingResponse = (RemoteLocatorPingResponse) TcpClient + .requestToServer(remoteLocator.getHost(), + remoteLocator.getPort(), pingRequest, + WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + if (pingResponse != null) { + continue; + } + break; + } + } + } + catch (IOException ioe) { + if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) { + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2, + new Object[] { request.getLocator(), remoteLocator, retryAttempt}), ioe); + break; + } + if (skipFailureLogging(remoteLocator)) { + logger.warn(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_WITH_1_AFTER_2_RETRYING_IN_3_MS, + new Object[] { request.getLocator(), remoteLocator, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL })); + } + try { + Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + retryAttempt++; + continue; + } + catch (ClassNotFoundException cnfe) { + logger.fatal(LocalizedMessage.create(LocalizedStrings.LOCATOR_DISCOVERY_TASK_ENCOUNTERED_UNEXPECTED_EXCEPTION), cnfe); + break; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java new file mode 100644 index 0000000..a5b57c8 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorHelper.java @@ -0,0 +1,134 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.CopyOnWriteHashSet; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +/** + * This is a helper class which helps to add the locator information to the allLocatorInfoMap. + * + * @author Kishor Bachhav + * + */ +public class LocatorHelper { + + public final static Object locatorObject = new Object(); + /** + * + * This methods add the given locator to allLocatorInfoMap. + * It also invokes a locatorlistener to inform other locators in allLocatorInfoMap about this newly added locator. + * @param distributedSystemId + * @param locator + * @param locatorListener + * @param sourceLocator + */ + public static boolean addLocator(int distributedSystemId, + DistributionLocatorId locator, LocatorMembershipListener locatorListener, + DistributionLocatorId sourceLocator) { + ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener + .getAllLocatorsInfo(); + Set<DistributionLocatorId> locatorsSet = new CopyOnWriteHashSet<DistributionLocatorId>(); + locatorsSet.add(locator); + Set<DistributionLocatorId> existingValue = allLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet); + if(existingValue != null){ + if (!existingValue.contains(locator)) { + existingValue.add(locator); + addServerLocator(distributedSystemId, locatorListener, locator); + locatorListener.locatorJoined(distributedSystemId, locator, + sourceLocator); + } + else { + return false; + } + }else{ + addServerLocator(distributedSystemId, locatorListener, locator); + locatorListener.locatorJoined(distributedSystemId, locator, + sourceLocator); + } + return true; + } + + /** + * This methods decides whether the given locator is server locator, if so + * then add this locator in allServerLocatorsInfo map. + * + * @param distributedSystemId + * @param locatorListener + * @param locator + */ + private static void addServerLocator(Integer distributedSystemId, + LocatorMembershipListener locatorListener, DistributionLocatorId locator) { + if (!locator.isServerLocator()) { + return; + } + ConcurrentHashMap<Integer, Set<String>> allServerLocatorsInfo = (ConcurrentHashMap<Integer, Set<String>>)locatorListener + .getAllServerLocatorsInfo(); + + Set<String> locatorsSet = new CopyOnWriteHashSet<String>(); + locatorsSet.add(locator.toString()); + Set<String> existingValue = allServerLocatorsInfo.putIfAbsent(distributedSystemId, locatorsSet); + if(existingValue != null){ + if (!existingValue.contains(locator.toString())) { + existingValue.add(locator.toString()); + } + } + } + + /** + * This method adds the map of locatorsinfo sent by other locator to this locator's allLocatorInfo + * + * @param locators + * @param locatorListener + */ + public static boolean addExchnagedLocators(Map<Integer, Set<DistributionLocatorId>> locators, + LocatorMembershipListener locatorListener) { + + ConcurrentHashMap<Integer, Set<DistributionLocatorId>> allLocators = (ConcurrentHashMap<Integer, Set<DistributionLocatorId>>)locatorListener + .getAllLocatorsInfo(); + if (!allLocators.equals(locators)) { + for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators + .entrySet()) { + Set<DistributionLocatorId> existingValue = allLocators.putIfAbsent( + entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>(entry + .getValue())); + + if (existingValue != null) { + Set<DistributionLocatorId> localLocators = allLocators.get(entry + .getKey()); + if (!localLocators.equals(entry.getValue())) { + entry.getValue().removeAll(localLocators); + for (DistributionLocatorId locator : entry.getValue()) { + localLocators.add(locator); + addServerLocator(entry.getKey(), locatorListener, locator); + locatorListener.locatorJoined(entry.getKey(), locator, null); + } + } + + } + else { + for (DistributionLocatorId locator : entry.getValue()) { + addServerLocator(entry.getKey(), locatorListener, locator); + locatorListener.locatorJoined(entry.getKey(), locator, null); + } + } + } + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java new file mode 100644 index 0000000..124ca2e --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorJoinMessage.java @@ -0,0 +1,96 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; + +public class LocatorJoinMessage extends ServerLocationRequest { + + private DistributionLocatorId locator; + + private int distributedSystemId; + + private DistributionLocatorId sourceLocator; + + public LocatorJoinMessage() { + super(); + } + + public LocatorJoinMessage(int distributedSystemId, DistributionLocatorId locator, + DistributionLocatorId sourceLocator, String serverGroup) { + super(serverGroup); + this.locator = locator; + this.distributedSystemId = distributedSystemId; + this.sourceLocator = sourceLocator; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.locator = DataSerializer.readObject(in); + this.distributedSystemId = in.readInt(); + this.sourceLocator = DataSerializer.readObject(in); + } + + public void toData(DataOutput out) throws IOException { + super.toData(out); + DataSerializer.writeObject(locator, out); + out.writeInt(this.distributedSystemId); + DataSerializer.writeObject(sourceLocator, out); + } + + public DistributionLocatorId getLocator() { + return this.locator; + } + + public int getDistributedSystemId() { + return distributedSystemId; + } + + public DistributionLocatorId getSourceLocator() { + return sourceLocator; + } + + public int getDSFID() { + return DataSerializableFixedID.LOCATOR_JOIN_MESSAGE; + } + + @Override + public String toString() { + return "LocatorJoinMessage{distributedSystemId="+ distributedSystemId +" locators=" + locator + " Source Locator : " + sourceLocator +"}"; + } + + @Override + public boolean equals(Object obj){ + if ( this == obj ) return true; + if ( !(obj instanceof LocatorJoinMessage) ) return false; + LocatorJoinMessage myObject = (LocatorJoinMessage)obj; + if((this.distributedSystemId == myObject.getDistributedSystemId()) && this.locator.equals(myObject.getLocator())){ + return true; + } + return false; + } + + @Override + public int hashCode() { + // it is sufficient for all messages having the same locator to hash to the same bucket + if (this.locator == null) { + return 0; + } else { + return this.locator.hashCode(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java new file mode 100644 index 0000000..67049e6 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java @@ -0,0 +1,221 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorJoinMessage; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorJoinRequest; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorJoinResponse; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorPingRequest; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorPingResponse; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse; +import com.gemstone.gemfire.internal.CopyOnWriteHashSet; +import com.gemstone.gemfire.internal.DSFIDFactory; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.CommandInitializer; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; + +/** + * An implementation of + * {@link com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener} + * + * @author kbachhav + * + */ +public class LocatorMembershipListenerImpl implements LocatorMembershipListener { + + private ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new ConcurrentHashMap<Integer, Set<DistributionLocatorId>>(); + + private ConcurrentMap<Integer, Set<String>> allServerLocatorsInfo = new ConcurrentHashMap<Integer, Set<String>>(); + + private static final Logger logger = LogService.getLogger(); + + private DistributionConfig config; + + private int port; + + public LocatorMembershipListenerImpl() { + } + + public void setPort(int port){ + this.port = port; + } + + public void setConfig(DistributionConfig config) { + this.config = config; + } + + /** + * When the new locator is added to remote locator metadata, inform all other + * locators in remote locator metadata about the new locator so that they can + * update their remote locator metadata. + * + * @param locator + */ + + public void locatorJoined(final int distributedSystemId, + final DistributionLocatorId locator, + final DistributionLocatorId sourceLocator) { + Thread distributeLocator = new Thread(new Runnable() { + public void run() { + ConcurrentMap<Integer, Set<DistributionLocatorId>> remoteLocators = getAllLocatorsInfo(); + ArrayList<DistributionLocatorId> locatorsToRemove = new ArrayList<DistributionLocatorId>(); + + String localLocator = config.getStartLocator(); + DistributionLocatorId localLocatorId = null; + if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { + localLocatorId = new DistributionLocatorId(port, config + .getBindAddress()); + } + else { + localLocatorId = new DistributionLocatorId(localLocator); + } + locatorsToRemove.add(localLocatorId); + locatorsToRemove.add(locator); + locatorsToRemove.add(sourceLocator); + + Map<Integer, Set<DistributionLocatorId>> localCopy = new HashMap<Integer, Set<DistributionLocatorId>>(); + for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()){ + Set<DistributionLocatorId> value = new CopyOnWriteHashSet<DistributionLocatorId>(entry.getValue()); + localCopy.put(entry.getKey(), value); + } + for(Map.Entry<Integer, Set<DistributionLocatorId>> entry : localCopy.entrySet()){ + for(DistributionLocatorId removeLocId : locatorsToRemove){ + if(entry.getValue().contains(removeLocId)){ + entry.getValue().remove(removeLocId); + } + } + for (DistributionLocatorId value : entry.getValue()) { + try { + TcpClient.requestToServer(value.getHost(), value.getPort(), + new LocatorJoinMessage(distributedSystemId, locator, localLocatorId, ""), 1000, false); + } + catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, + new Object[] { locator.getHost(), locator.getPort(), value.getHost(), value.getPort() })); + } + } + try { + TcpClient.requestToServer(locator.getHost(), locator.getPort(), + new LocatorJoinMessage(entry.getKey(), value, localLocatorId, ""), 1000, false); + } + catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug(LocalizedMessage.create(LocalizedStrings.LOCATOR_MEMBERSHIP_LISTENER_COULD_NOT_EXCHANGE_LOCATOR_INFORMATION_0_1_WIHT_2_3, + new Object[] { value.getHost(), value.getPort(), locator.getHost(), locator.getPort() })); + } + } + } + } + } + }); + distributeLocator.setDaemon(true); + distributeLocator.start(); + } + + public Object handleRequest(Object request) { + Object response = null; + if (request instanceof RemoteLocatorJoinRequest) { + response = updateAllLocatorInfo((RemoteLocatorJoinRequest)request); + } + else if (request instanceof LocatorJoinMessage) { + response = informAboutRemoteLocators((LocatorJoinMessage)request); + } + else if (request instanceof RemoteLocatorPingRequest) { + response = getPingResponse((RemoteLocatorPingRequest)request); + } + else if (request instanceof RemoteLocatorRequest) { + response = getRemoteLocators((RemoteLocatorRequest)request); + } + return response; + } + + /** + * A locator from the request is checked against the existing remote locator + * metadata. If it is not available then added to existing remote locator + * metadata and LocatorMembershipListener is invoked to inform about the + * this newly added locator to all other locators available in remote locator + * metadata. As a response, remote locator metadata is sent. + * + * @param request + */ + private synchronized Object updateAllLocatorInfo(RemoteLocatorJoinRequest request) { + int distributedSystemId = request.getDistributedSystemId(); + DistributionLocatorId locator = request.getLocator(); + + LocatorHelper.addLocator(distributedSystemId, locator, this, null); + return new RemoteLocatorJoinResponse(this.getAllLocatorsInfo()); + } + + private Object getPingResponse(RemoteLocatorPingRequest request) { + return new RemoteLocatorPingResponse(); + } + + private Object informAboutRemoteLocators(LocatorJoinMessage request){ + // TODO: FInd out the importance of list locatorJoinMessages. During + // refactoring I could not understand its significance +// synchronized (locatorJoinObject) { +// if (locatorJoinMessages.contains(request)) { +// return null; +// } +// locatorJoinMessages.add(request); +// } + int distributedSystemId = request.getDistributedSystemId(); + DistributionLocatorId locator = request.getLocator(); + DistributionLocatorId sourceLocatorId = request.getSourceLocator(); + + LocatorHelper.addLocator(distributedSystemId, locator, this, sourceLocatorId); + return null; + } + + private Object getRemoteLocators(RemoteLocatorRequest request) { + int dsId = request.getDsId(); + Set<String> locators = this.getRemoteLocatorInfo(dsId); + return new RemoteLocatorResponse(locators); + } + + public Set<String> getRemoteLocatorInfo(int dsId) { + return this.allServerLocatorsInfo.get(dsId); + } + + public ConcurrentMap<Integer,Set<DistributionLocatorId>> getAllLocatorsInfo() { + return this.allLocatorsInfo; + } + + public ConcurrentMap<Integer,Set<String>> getAllServerLocatorsInfo() { + return this.allServerLocatorsInfo; + } + + public void clearLocatorInfo(){ + allLocatorsInfo.clear(); + allServerLocatorsInfo.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java new file mode 100644 index 0000000..b31a6bd --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinRequest.java @@ -0,0 +1,78 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.client.internal.locator.ServerLocationRequest; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; + +/** + * Requests remote locators of a remote WAN site + * + * @author Suranjan Kumar + * @author Yogesh Mahajan + * @author Kishor Bachhav + * + * @since 6.6 + * + */ +public class RemoteLocatorJoinRequest implements DataSerializableFixedID { + + private DistributionLocatorId locator = null; + + private int distributedSystemId = -1; + + public RemoteLocatorJoinRequest() { + super(); + } + + public RemoteLocatorJoinRequest(int distributedSystemId, DistributionLocatorId locator, + String serverGroup) { + this.distributedSystemId = distributedSystemId; + this.locator = locator; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.locator = DataSerializer.readObject(in); + this.distributedSystemId = in.readInt(); + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeObject(locator, out); + out.writeInt(this.distributedSystemId); + } + + public DistributionLocatorId getLocator() { + return this.locator; + } + + public int getDistributedSystemId() { + return distributedSystemId; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST; + } + + @Override + public String toString() { + return "RemoteLocatorJoinRequest{locator=" + locator + "}"; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java new file mode 100644 index 0000000..4bd478d --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorJoinResponse.java @@ -0,0 +1,80 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.internal.CopyOnWriteHashSet; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; + +/** + * List of remote locators as a response + * + * @author Suranjan Kumar + * @author Yogesh Mahajan + * @author Kishor Bachhav + * + * + */ +public class RemoteLocatorJoinResponse implements DataSerializableFixedID{ + + private HashMap<Integer, Set<DistributionLocatorId>> locators = new HashMap<Integer, Set<DistributionLocatorId>>(); + + /** Used by DataSerializer */ + public RemoteLocatorJoinResponse() { + super(); + } + + public RemoteLocatorJoinResponse( + Map<Integer, Set<DistributionLocatorId>> locators) { + super(); + this.locators = new HashMap<Integer, Set<DistributionLocatorId>>(); + for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : locators + .entrySet()) { + this.locators.put(entry.getKey(), new CopyOnWriteHashSet<DistributionLocatorId>( + entry.getValue())); + } + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.locators = DataSerializer.readHashMap(in); + + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeHashMap(locators, out); + } + + public Map<Integer, Set<DistributionLocatorId>> getLocators() { + return this.locators; + } + + @Override + public String toString() { + return "RemoteLocatorJoinResponse{locators=" + locators + "}"; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java new file mode 100644 index 0000000..937b676 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingRequest.java @@ -0,0 +1,47 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * + * @author Kishor Bachhav + * + */ + +public class RemoteLocatorPingRequest implements DataSerializableFixedID{ + + public RemoteLocatorPingRequest() { + super(); + } + + public RemoteLocatorPingRequest(String serverGroup) { + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + } + + public void toData(DataOutput out) throws IOException { + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java new file mode 100644 index 0000000..3978e81 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorPingResponse.java @@ -0,0 +1,46 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * + * @author Kishor Bachhav + */ +public class RemoteLocatorPingResponse implements DataSerializableFixedID { + + + /** Used by DataSerializer */ + public RemoteLocatorPingResponse() { + super(); + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + } + + public void toData(DataOutput out) throws IOException { + } + + + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java new file mode 100644 index 0000000..8187531 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorRequest.java @@ -0,0 +1,57 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; +/** + * + * @author Suranjan Kumar + * @author Yogesh Mahajan + * @author Kishor Bachhav + * + */ +public class RemoteLocatorRequest implements DataSerializableFixedID{ + private int distributedSystemId ; + + public RemoteLocatorRequest() { + super(); + } + public RemoteLocatorRequest(int dsId, String serverGroup) { + this.distributedSystemId = dsId; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.distributedSystemId = in.readInt(); + } + + public void toData(DataOutput out) throws IOException { + out.writeInt(this.distributedSystemId); + } + + public int getDsId() { + return this.distributedSystemId; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_REQUEST; + } + + @Override + public String toString() { + return "RemoteLocatorRequest{dsName=" + distributedSystemId + "}"; + } + @Override + public Version[] getSerializationVersions() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java new file mode 100644 index 0000000..5c61bdf --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/RemoteLocatorResponse.java @@ -0,0 +1,65 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Set; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.Version; + +/** + * + * @author Suranjan Kumar + * @author Yogesh Mahajan + * @author Kishor Bachhav + * + */ +public class RemoteLocatorResponse implements DataSerializableFixedID{ + + private Set<String> locators ; + + /** Used by DataSerializer */ + public RemoteLocatorResponse() { + super(); + } + + public RemoteLocatorResponse(Set<String> locators) { + this.locators = locators; + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.locators = DataSerializer.readObject(in); + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeObject(this.locators, out); + } + + public Set<String> getLocators() { + return this.locators; + } + + @Override + public String toString() { + return "RemoteLocatorResponse{locators=" + locators +"}"; + } + + public int getDSFID() { + return DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE; + } + + @Override + public Version[] getSerializationVersions() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java new file mode 100644 index 0000000..a44a449 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WANFactoryImpl.java @@ -0,0 +1,58 @@ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.DSFIDFactory; +import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverFactoryImpl; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderFactoryImpl; +import com.gemstone.gemfire.internal.cache.wan.spi.WANFactory; + +public class WANFactoryImpl implements WANFactory { + + @Override + public void initialize() { + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_JOIN_REQUEST, + RemoteLocatorJoinRequest.class); + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_JOIN_RESPONSE, + RemoteLocatorJoinResponse.class); + DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_REQUEST, + RemoteLocatorRequest.class); + DSFIDFactory.registerDSFID(DataSerializableFixedID.LOCATOR_JOIN_MESSAGE, + LocatorJoinMessage.class); + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_PING_REQUEST, + RemoteLocatorPingRequest.class); + DSFIDFactory.registerDSFID( + DataSerializableFixedID.REMOTE_LOCATOR_PING_RESPONSE, + RemoteLocatorPingResponse.class); + DSFIDFactory.registerDSFID(DataSerializableFixedID.REMOTE_LOCATOR_RESPONSE, + RemoteLocatorResponse.class); + } + + @Override + public GatewaySenderFactory createGatewaySenderFactory(Cache cache) { + return new GatewaySenderFactoryImpl(cache); + } + + @Override + public GatewayReceiverFactory createGatewayReceiverFactory(Cache cache) { + return new GatewayReceiverFactoryImpl(cache); + } + + @Override + public WanLocatorDiscoverer createLocatorDiscoverer() { + return new WanLocatorDiscovererImpl(); + } + + @Override + public LocatorMembershipListener createLocatorMembershipListener() { + return new LocatorMembershipListenerImpl(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java new file mode 100644 index 0000000..e6ef6e0 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java @@ -0,0 +1,129 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal.locator.wan; + +import java.util.Set; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorDiscovery; +import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorJoinRequest; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; + +public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{ + + private static final Logger logger = LogService.getLogger(); + + private ExecutorService _executor; + + public WanLocatorDiscovererImpl() { + + } + + @Override + public void discover(int port, DistributionConfigImpl config, LocatorMembershipListener locatorListener) { + final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup + .createThreadGroup("WAN Locator Discovery Logger Group", logger); + + final ThreadFactory threadFactory = new ThreadFactory() { + public Thread newThread(final Runnable task) { + final Thread thread = new Thread(loggingThreadGroup, task, + "WAN Locator Discovery Thread"); + thread.setDaemon(true); + return thread; + } + }; + + this._executor = Executors.newCachedThreadPool(threadFactory); + exchangeLocalLocators(port, config, locatorListener); + exchangeRemoteLocators(port, config, locatorListener); + this._executor.shutdown(); + } + + + /** + * For WAN 70 Exchange the locator information within the distributed system + * + * @param config + */ + private void exchangeLocalLocators(int port, DistributionConfigImpl config, LocatorMembershipListener locatorListener) { + String localLocator = config.getStartLocator(); + DistributionLocatorId locatorId = null; + if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { + locatorId = new DistributionLocatorId(port, config.getBindAddress()); + } + else { + locatorId = new DistributionLocatorId(localLocator); + } + LocatorHelper.addLocator(config.getDistributedSystemId(), locatorId, locatorListener, null); + + RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config); + StringTokenizer locatorsOnThisVM = new StringTokenizer( + config.getLocators(), ","); + while (locatorsOnThisVM.hasMoreTokens()) { + DistributionLocatorId localLocatorId = new DistributionLocatorId( + locatorsOnThisVM.nextToken()); + if (!locatorId.equals(localLocatorId)) { + LocatorDiscovery localDiscovery = new LocatorDiscovery(localLocatorId, request, locatorListener); + LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = localDiscovery.new LocalLocatorDiscovery(); + this._executor.execute(localLocatorDiscovery); + } + } + } + + /** + * For WAN 70 Exchange the locator information across the distributed systems + * (sites) + * + * @param config + */ + private void exchangeRemoteLocators(int port, DistributionConfigImpl config, LocatorMembershipListener locatorListener) { + RemoteLocatorJoinRequest request = buildRemoteDSJoinRequest(port, config); + String remoteDustributedSystems = config.getRemoteLocators(); + if (remoteDustributedSystems.length() > 0) { + StringTokenizer remoteLocators = new StringTokenizer( + remoteDustributedSystems, ","); + while (remoteLocators.hasMoreTokens()) { + DistributionLocatorId remoteLocatorId = new DistributionLocatorId( + remoteLocators.nextToken()); + LocatorDiscovery localDiscovery = new LocatorDiscovery(remoteLocatorId, + request, locatorListener); + LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = localDiscovery.new RemoteLocatorDiscovery(); + this._executor.execute(remoteLocatorDiscovery); + } + } + } + + private RemoteLocatorJoinRequest buildRemoteDSJoinRequest(int port, + DistributionConfigImpl config) { + String localLocator = config.getStartLocator(); + DistributionLocatorId locatorId = null; + if (localLocator.equals(DistributionConfig.DEFAULT_START_LOCATOR)) { + locatorId = new DistributionLocatorId(port, config.getBindAddress()); + } + else { + locatorId = new DistributionLocatorId(localLocator); + } + RemoteLocatorJoinRequest request = new RemoteLocatorJoinRequest( + config.getDistributedSystemId(), locatorId, ""); + return request; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java new file mode 100644 index 0000000..e9a0451 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractRemoteGatewaySender.java @@ -0,0 +1,153 @@ +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.Iterator; +import java.util.Properties; +import java.util.StringTokenizer; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorRequest; +import com.gemstone.gemfire.cache.client.internal.locator.wan.RemoteLocatorResponse; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.WanLocatorDiscoverer; +import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; +import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; + +public abstract class AbstractRemoteGatewaySender extends AbstractGatewaySender { + private static final Logger logger = LogService.getLogger(); + + public AbstractRemoteGatewaySender() { + + } + public AbstractRemoteGatewaySender(Cache cache, GatewaySenderAttributes attrs){ + super(cache, attrs); + } + + /** used to reduce warning logs in case remote locator is down (#47634) */ + protected int proxyFailureTries = 0; + + public synchronized void initProxy() { + // return if it is being used for WBCL or proxy is already created + if (this.remoteDSId == DEFAULT_DISTRIBUTED_SYSTEM_ID || this.proxy != null + && !this.proxy.isDestroyed()) { + return; + } + + int locatorCount = 0; + PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory(); + pf.setPRSingleHopEnabled(false); + if (this.locatorDiscoveryCallback != null) { + pf.setLocatorDiscoveryCallback(locatorDiscoveryCallback); + } + pf.setReadTimeout(this.socketReadTimeout); + pf.setIdleTimeout(connectionIdleTimeOut); + pf.setSocketBufferSize(socketBufferSize); + pf.setServerGroup(GatewayReceiver.RECEIVER_GROUP); + RemoteLocatorRequest request = new RemoteLocatorRequest(this.remoteDSId, pf + .getPoolAttributes().getServerGroup()); + String locators = ((GemFireCacheImpl) this.cache).getDistributedSystem() + .getConfig().getLocators(); + if (logger.isDebugEnabled()) { + logger.debug("Gateway Sender is attempting to configure pool with remote locator information"); + } + StringTokenizer locatorsOnThisVM = new StringTokenizer(locators, ","); + while (locatorsOnThisVM.hasMoreTokens()) { + String localLocator = locatorsOnThisVM.nextToken(); + DistributionLocatorId locatorID = new DistributionLocatorId(localLocator); + try { + RemoteLocatorResponse response = (RemoteLocatorResponse) TcpClient + .requestToServer(locatorID.getHost(), locatorID.getPort(), request, + WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT); + + if (response != null) { + if (response.getLocators() == null) { + if (logProxyFailure()) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_REMOTE_LOCATOR_FOR_REMOTE_SITE_0_IS_NOT_AVAILABLE_IN_LOCAL_LOCATOR_1, + new Object[] { remoteDSId, localLocator })); + } + continue; + } + if (logger.isDebugEnabled()) { + logger.debug("Received the remote site {} location information:", this.remoteDSId, response.getLocators()); + } + StringBuffer strBuffer = new StringBuffer(); + Iterator<String> itr = response.getLocators().iterator(); + while (itr.hasNext()) { + DistributionLocatorId locatorId = new DistributionLocatorId(itr.next()); + pf.addLocator(locatorId.getHost().getHostName(), locatorId.getPort()); + locatorCount++; + } + break; + } + } catch (IOException ioe) { + if (logProxyFailure()) { + // don't print stack trace for connection failures + String ioeStr = ""; + if (!logger.isDebugEnabled() && ioe instanceof ConnectException) { + ioeStr = ": " + ioe.toString(); + ioe = null; + } + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1, + new Object[] { this.id, localLocator + ioeStr }), ioe); + } + continue; + } catch (ClassNotFoundException e) { + if (logProxyFailure()) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_SENDER_0_IS_NOT_ABLE_TO_CONNECT_TO_LOCAL_LOCATOR_1, + new Object[] { this.id, localLocator }), e); + } + continue; + } + } + + if (locatorCount == 0) { + if (logProxyFailure()) { + logger.fatal(LocalizedMessage.create( + LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1, + new Object[] { this.id, this.remoteDSId })); + } + this.proxyFailureTries++; + throw new GatewaySenderConfigurationException( + LocalizedStrings.AbstractGatewaySender_SENDER_0_COULD_NOT_GET_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1 + .toLocalizedString(new Object[] { this.id, this.remoteDSId})); + } + pf.init(this); + this.proxy = ((PoolImpl) pf.create(this.getId())); + if (this.proxyFailureTries > 0) { + logger.info(LocalizedMessage.create(LocalizedStrings.AbstractGatewaySender_SENDER_0_GOT_REMOTE_LOCATOR_INFORMATION_FOR_SITE_1, + new Object[] { this.id, this.remoteDSId, this.proxyFailureTries })); + this.proxyFailureTries = 0; + } + } + + protected boolean logProxyFailure() { + assert Thread.holdsLock(this); + // always log the first failure + if (logger.isDebugEnabled() || this.proxyFailureTries == 0) { + return true; + } else { + // subsequent failures will be logged on 30th, 300th, 3000th try + // each try is at 100millis from higher layer so this accounts for logging + // after 3s, 30s and then every 5mins + if (this.proxyFailureTries >= 3000) { + return (this.proxyFailureTries % 3000) == 0; + } else { + return (this.proxyFailureTries == 30 || this.proxyFailureTries == 300); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java new file mode 100644 index 0000000..f3e03a8 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverFactoryImpl.java @@ -0,0 +1,140 @@ +/* + * ========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + * ======================================================================== + */ + +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation; +import com.gemstone.gemfire.internal.cache.xmlcache.GatewayReceiverCreation; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +/** + * @author Suranjan Kumar + * @author Yogesh Mahajan + * + * @since 7.0 + */ +public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory { + + private int startPort = GatewayReceiver.DEFAULT_START_PORT; + + private int endPort = GatewayReceiver.DEFAULT_END_PORT; + + private int timeBetPings = GatewayReceiver.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS; + + private int socketBuffSize = GatewayReceiver.DEFAULT_SOCKET_BUFFER_SIZE; + + private String bindAdd= GatewayReceiver.DEFAULT_BIND_ADDRESS; + + private String hostnameForSenders = GatewayReceiver.DEFAULT_HOSTNAME_FOR_SENDERS; + + private boolean manualStart = GatewayReceiver.DEFAULT_MANUAL_START; + + private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>(); + + private Cache cache; + + public GatewayReceiverFactoryImpl() { + + } + public GatewayReceiverFactoryImpl(Cache cache) { + this.cache = cache; + } + + public GatewayReceiverFactory addGatewayTransportFilter( + GatewayTransportFilter filter) { + this.filters.add(filter); + return this; + } + + public GatewayReceiverFactory removeGatewayTransportFilter( + GatewayTransportFilter filter) { + this.filters.remove(filter); + return this; + } + + public GatewayReceiverFactory setMaximumTimeBetweenPings(int time) { + this.timeBetPings = time; + return this; + } + + public GatewayReceiverFactory setStartPort(int port) { + this.startPort = port; + return this; + } + + public GatewayReceiverFactory setEndPort(int port) { + this.endPort = port; + return this; + } + + public GatewayReceiverFactory setSocketBufferSize(int size) { + this.socketBuffSize = size; + return this; + } + + public GatewayReceiverFactory setBindAddress(String address) { + this.bindAdd = address; + return this; + } + + public GatewayReceiverFactory setHostnameForSenders(String address) { + this.hostnameForSenders = address; + return this; + } + + public GatewayReceiverFactory setManualStart(boolean start) { + this.manualStart = start; + return this; + } + + public GatewayReceiver create() { + if (this.startPort > this.endPort) { + throw new IllegalStateException( + "Please specify either start port a value which is less than end port."); + } + GatewayReceiver recv = null; + if (this.cache instanceof GemFireCacheImpl) { + recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort, + this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters, + this.hostnameForSenders, this.manualStart); + ((GemFireCacheImpl)cache).addGatewayReceiver(recv); + InternalDistributedSystem system = (InternalDistributedSystem) this.cache + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv); + if (!this.manualStart) { + try { + recv.start(); + } + catch (IOException ioe) { + throw new GatewayReceiverException( + LocalizedStrings.GatewayReceiver_EXCEPTION_WHILE_STARTING_GATEWAY_RECEIVER + .toLocalizedString(), ioe); + } + } + } else if (this.cache instanceof CacheCreation) { + recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort, + this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters, + this.hostnameForSenders, this.manualStart); + ((CacheCreation)cache).addGatewayReceiver(recv); + } + return recv; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java new file mode 100644 index 0000000..0a14e2e --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewayReceiverImpl.java @@ -0,0 +1,245 @@ +/* + * ========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + * ======================================================================== + */ +package com.gemstone.gemfire.internal.cache.wan; + +import java.io.IOException; +import java.net.BindException; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.SocketCreator; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +/** + * @author Suranjan Kumar + * @author Yogesh Mahajan + * + * @since 7.0 + */ +public class GatewayReceiverImpl implements GatewayReceiver { + + private static final Logger logger = LogService.getLogger(); + + private String host; + + private int startPort; + + private int endPort; + + private int port; + + private int timeBetPings; + + private int socketBufferSize; + + private boolean manualStart; + + private final List<GatewayTransportFilter> filters; + + private String bindAdd; + + private CacheServer receiver; + + private final GemFireCacheImpl cache; + + public GatewayReceiverImpl(Cache cache, int startPort, + int endPort, int timeBetPings, int buffSize, String bindAdd, + List<GatewayTransportFilter> filters, String hostnameForSenders, boolean manualStart) { + this.cache = (GemFireCacheImpl)cache; + + /* + * If user has set hostNameForSenders then it should take precedence over + * bindAddress. If user hasn't set either hostNameForSenders or bindAddress + * then getLocalHost().getHostName() should be used. + */ + if (hostnameForSenders == null || hostnameForSenders.isEmpty()) { + if (bindAdd == null || bindAdd.isEmpty()) { + try { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiverImpl_USING_LOCAL_HOST)); + this.host = SocketCreator.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new IllegalStateException( + LocalizedStrings.GatewayReceiverImpl_COULD_NOT_GET_HOST_NAME + .toLocalizedString(), + e); + } + } else { + this.host = bindAdd; + } + } else { + this.host = hostnameForSenders; + } + + this.startPort = startPort; + this.endPort = endPort; + this.timeBetPings = timeBetPings; + this.socketBufferSize = buffSize; + this.bindAdd = bindAdd; + this.filters = filters; + this.manualStart = manualStart; + } + + public List<GatewayTransportFilter> getGatewayTransportFilters() { + return this.filters; + } + + public int getMaximumTimeBetweenPings() { + return this.timeBetPings; + } + + public int getPort() { + return this.port; + } + + public int getStartPort() { + return this.startPort; + } + + public int getEndPort() { + return this.endPort; + } + + public int getSocketBufferSize() { + return this.socketBufferSize; + } + + public boolean isManualStart() { + return this.manualStart; + } + + public CacheServer getServer() { + return receiver; + } + + public void start() throws IOException { + if (receiver == null) { + receiver = this.cache.addCacheServer(true); + } + if (receiver.isRunning()) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING)); + return; + } + boolean started = false; + this.port = getPortToStart(); + while (!started && this.port != -1) { + receiver.setPort(this.port); + receiver.setSocketBufferSize(socketBufferSize); + receiver.setMaximumTimeBetweenPings(timeBetPings); + receiver.setHostnameForClients(host); + receiver.setBindAddress(bindAdd); + receiver.setGroups(new String[] { GatewayReceiverImpl.RECEIVER_GROUP }); + ((CacheServerImpl)receiver).setGatewayTransportFilter(this.filters); + try { + ((CacheServerImpl)receiver).start(); + started = true; + } catch (BindException be) { + if (be.getCause() != null + && be.getCause().getMessage() + .contains("assign requested address")) { + throw new GatewayReceiverException( + LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1 + .toLocalizedString(new Object[] { bindAdd, + Integer.valueOf(this.port) })); + } + // ignore as this port might have been used by other threads. + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); + this.port = getPortToStart(); + } catch (SocketException se) { + if (se.getMessage().contains("Address already in use")) { + logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port)); + this.port = getPortToStart(); + + } else { + throw se; + } + } + + } + if (!started) { + throw new IllegalStateException( + "No available free port found in the given range."); + } + logger.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port)); + + InternalDistributedSystem system = this.cache.getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this); + + } + + private int getPortToStart(){ + // choose a random port from the given port range + int rPort; + if (this.startPort == this.endPort) { + rPort = this.startPort; + } else { + rPort = AvailablePort.getRandomAvailablePortInRange(this.startPort, + this.endPort, AvailablePort.SOCKET); + } + return rPort; + } + + public void stop() { + if(!isRunning()){ + throw new GatewayReceiverException(LocalizedStrings.GatewayReceiver_IS_NOT_RUNNING.toLocalizedString()); + } + receiver.stop(); + +// InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) +// .getDistributedSystem(); +// system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_STOP, this); + + } + + public String getHost() { + return this.host; + } + + public String getBindAddress() { + return this.bindAdd; + } + + public boolean isRunning() { + if (this.receiver != null) { + return this.receiver.isRunning(); + } + return false; + } + + public String toString() { + return new StringBuffer() + .append("Gateway Receiver") + .append("@").append(Integer.toHexString(hashCode())) + .append(" [") + .append("host='").append(getHost()) + .append("'; port=").append(getPort()) + .append("; bindAddress=").append(getBindAddress()) + .append("; maximumTimeBetweenPings=").append(getMaximumTimeBetweenPings()) + .append("; socketBufferSize=").append(getSocketBufferSize()) + .append("; isManualStart=").append(isManualStart()) + .append("; group=").append(Arrays.toString(new String[]{GatewayReceiverImpl.RECEIVER_GROUP})) + .append("]") + .toString(); + } + +}