[
https://issues.apache.org/jira/browse/GEODE-8202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17260792#comment-17260792
]
ASF GitHub Bot commented on GEODE-8202:
---------------------------------------
boglesby commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r553566356
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
*/
private int batchSize;
+ private String expectedReceiverUniqueId = "";
Review comment:
I think the expectedReceiverUniqueId should be on AbstractGatewaySender
(like serverLocation).
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
*/
private int batchSize;
+ private String expectedReceiverUniqueId = "";
+
+ private boolean enforceThreadsConnectSameReceiver = false;
+
Review comment:
AbstractGatewaySenderEventProcessor defines the
enforceThreadsConnectSameReceiver but it doesn't need to since
AbstractGatewaySender already defines it and the processor has a reference to
the sender. Removing this attribute will simplify some of this code.
The changes to RemoteConcurrentSerialGatewaySenderEventProcessor and
SerialGatewaySenderImpl would be eliminated.
ConcurrentSerialGatewaySenderEventProcessor can be changed to reference the
value in the sender (sender.getEnforceThreadsConnectSameReceiver()).
Here is a diff with those changes:
```
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 1fc160ebb0..3a20e3020b 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -148,8 +148,6 @@ public abstract class
AbstractGatewaySenderEventProcessor extends LoggingThread
private String expectedReceiverUniqueId = "";
- private boolean enforceThreadsConnectSameReceiver = false;
-
public AbstractGatewaySenderEventProcessor(String string,
GatewaySender sender, ThreadsMonitoring tMonitoring) {
super(string);
@@ -158,13 +156,6 @@ public abstract class
AbstractGatewaySenderEventProcessor extends LoggingThread
this.threadMonitoring = tMonitoring;
}
- public AbstractGatewaySenderEventProcessor(String string,
- GatewaySender sender, ThreadsMonitoring tMonitoring,
- boolean enforceThreadsConnectSameReceiver) {
- this(string, sender, tMonitoring);
- this.enforceThreadsConnectSameReceiver =
enforceThreadsConnectSameReceiver;
- }
-
public void setExpectedReceiverUniqueId(String uniqueId) {
this.expectedReceiverUniqueId = uniqueId;
}
@@ -173,14 +164,6 @@ public abstract class
AbstractGatewaySenderEventProcessor extends LoggingThread
return this.expectedReceiverUniqueId;
}
- public void setEnforceThreadsConnectSameReceiver(boolean value) {
- this.enforceThreadsConnectSameReceiver = value;
- }
-
- public boolean getEnforceThreadsConnectSameReceiver() {
- return this.enforceThreadsConnectSameReceiver;
- }
-
public Object getRunningStateLock() {
return runningStateLock;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 9cf7487a40..7adf99640f 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -77,20 +77,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor
}
}
- public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender
sender,
- ThreadsMonitoring tMonitoring, boolean cleanQueues,
- boolean enforceThreadsConnectSameReceiver) {
- super("Event Processor for GatewaySender_" + sender.getId(), sender,
tMonitoring,
- enforceThreadsConnectSameReceiver);
- this.sender = sender;
-
- initializeMessageQueue(sender.getId(), cleanQueues);
- queues = new HashSet<RegionQueue>();
- for (SerialGatewaySenderEventProcessor processor : processors) {
- queues.add(processor.getQueue());
- }
- }
-
@Override
public int getTotalQueueSize() {
int totalSize = 0;
@@ -194,7 +180,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
@Override
public void run() {
boolean isDebugEnabled = logger.isDebugEnabled();
- if (getEnforceThreadsConnectSameReceiver()) {
+ if (this.sender.getEnforceThreadsConnectSameReceiver()) {
this.processors.get(0).start();
waitForRunningStatus(this.processors.get(0));
String receiverUniqueId =
this.processors.get(0).getExpectedReceiverUniqueId();
@@ -206,7 +192,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor
}
}
- for (int i = getEnforceThreadsConnectSameReceiver() ? 1 : 0; i <
this.processors.size(); i++) {
+ for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 :
0; i < this.processors
+ .size(); i++) {
if (isDebugEnabled) {
logger.debug("Starting the serialProcessor {}", i);
}
diff --git
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
index 306a2a4937..7139307d7f 100644
---
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
+++
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
@@ -30,20 +30,11 @@ public class
RemoteConcurrentSerialGatewaySenderEventProcessor
super(sender, tMonitoring, cleanQueues);
}
- public
RemoteConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
- ThreadsMonitoring tMonitoring, boolean cleanQueues,
- boolean enforceThreadsConnectSameReceiver) {
- super(sender, tMonitoring, cleanQueues,
enforceThreadsConnectSameReceiver);
- }
-
@Override
protected void initializeMessageQueue(String id, boolean cleanQueues) {
for (int i = 0; i < sender.getDispatcherThreads(); i++) {
- SerialGatewaySenderEventProcessor processor =
- new RemoteSerialGatewaySenderEventProcessor(this.sender, id + "."
+ i,
- getThreadMonitorObj(), cleanQueues);
-
processor.setEnforceThreadsConnectSameReceiver(getEnforceThreadsConnectSameReceiver());
- processors.add(processor);
+ processors.add(new
RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." + i,
+ getThreadMonitorObj(), cleanQueues));
if (logger.isDebugEnabled()) {
logger.debug("Created the
RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
}
diff --git
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index 7c93836667..3474b4a3c5 100644
---
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -120,9 +120,7 @@ public class SerialGatewaySenderImpl extends
AbstractRemoteGatewaySender {
AbstractGatewaySenderEventProcessor eventProcessor;
if (getDispatcherThreads() > 1) {
eventProcessor = new
RemoteConcurrentSerialGatewaySenderEventProcessor(
- SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues,
- enforceThreadsConnectSameReceiver);
- //
eventProcessor.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
+ SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues);
} else {
eventProcessor = new
RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this,
getId(), getThreadMonitorObj(), cleanQueues);
```
##########
File path:
geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
}
}
+ Connection retryInitializeConnection(Connection con) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ ServerLocation server = this.sender.getServerLocation();
+ String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+ String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+ if (expectedServerId.equals("")) {
+ if (isDebugEnabled) {
+ logger.debug("First dispatcher connected to server " +
connectedServerId);
+ }
+ this.processor.setExpectedReceiverUniqueId(connectedServerId);
+ return con;
+ }
+
+ int attempt = 0;
+ final int attemptsPerServer = 5;
Review comment:
Should attemptsPerServer be configurable?
##########
File path:
geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
}
}
+ Connection retryInitializeConnection(Connection con) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ ServerLocation server = this.sender.getServerLocation();
Review comment:
The server variable is unused
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> New option for serial gw sender threads start when receivers share ip and port
> ------------------------------------------------------------------------------
>
> Key: GEODE-8202
> URL: https://issues.apache.org/jira/browse/GEODE-8202
> Project: Geode
> Issue Type: Improvement
> Reporter: Alberto Bustamante Reyes
> Assignee: Alberto Bustamante Reyes
> Priority: Major
> Labels: pull-request-available
>
> RFC:
> [https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start|https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)