This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new dd98bf36b6 [MINOR] Federated retry on connection error
dd98bf36b6 is described below
commit dd98bf36b6040c8754b7dc8508c6f5d7268f2b45
Author: baunsgaard <[email protected]>
AuthorDate: Mon Jul 17 10:55:45 2023 +0200
[MINOR] Federated retry on connection error
This commit adds a small retry block for federated requests that
fail due to connection issues. This in practice means if the request
does not get through we retry the instruction first with 100 ms wait
time up to 1sec after 5 retries.
In general this made the tests for me run more smoothly, since we would
not fail on workers not being started, but instead we would retry enough
for the workers to start.
---
.../controlprogram/federated/FederatedData.java | 54 +++++++++++++++++-----
1 file changed, 42 insertions(+), 12 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
index 0e0f837301..3b332d596f 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.controlprogram.federated;
import java.io.Serializable;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
@@ -143,9 +144,8 @@ public class FederatedData {
if(!_dataType.isMatrix() && !_dataType.isFrame())
throw new DMLRuntimeException("Federated datatype \"" +
_dataType.toString() + "\" is not supported.");
_varID = id;
- FederatedRequest request = (mtd != null) ?
- new FederatedRequest(RequestType.READ_VAR, id, mtd) :
- new FederatedRequest(RequestType.READ_VAR, id);
+ FederatedRequest request = (mtd != null) ? new
FederatedRequest(RequestType.READ_VAR, id,
+ mtd) : new FederatedRequest(RequestType.READ_VAR, id);
request.appendParam(_filepath);
request.appendParam(_dataType.name());
return executeFederatedOperation(request);
@@ -175,7 +175,20 @@ public class FederatedData {
* @param request the requested operation
* @return the response
*/
- public synchronized static Future<FederatedResponse>
executeFederatedOperation(InetSocketAddress address,
+ public static Future<FederatedResponse>
executeFederatedOperation(InetSocketAddress address,
+ FederatedRequest... request) {
+ return executeFederatedOperation(address, 1, request);
+ }
+
+ /**
+ * Executes an federated operation on a federated worker.
+ *
+ * @param address socket address (incl host and port)
+ * @param retry the retry count
+ * @param request the requested operation
+ * @return the response
+ */
+ public synchronized static Future<FederatedResponse>
executeFederatedOperation(InetSocketAddress address, int retry,
FederatedRequest... request) {
try {
final Bootstrap b = new Bootstrap();
@@ -196,11 +209,28 @@ public class FederatedData {
return handler.getProm();
}
catch(Exception e) {
+ if(e instanceof ConnectException) {
+
+ if(retry < 5) {
+ try {
+ // Increasing retry timeout
+ Thread.sleep(200 * retry);
+ }
+ catch(Exception e2) {
+ throw new
DMLRuntimeException(e);
+ }
+ return
executeFederatedOperation(address, retry + 1, request);
+ }
+ else {
+ throw new DMLRuntimeException(e);
+ }
+ }
throw new DMLRuntimeException("Failed sending federated
operation", e);
}
}
- private static ChannelInitializer<SocketChannel>
createChannel(InetSocketAddress address, DataRequestHandler handler){
+ private static ChannelInitializer<SocketChannel>
createChannel(InetSocketAddress address,
+ DataRequestHandler handler) {
final int timeout = ConfigurationManager.getFederatedTimeout();
final boolean ssl = ConfigurationManager.isFederatedSSL();
@@ -240,9 +270,8 @@ public class FederatedData {
}
}
- private static SslHandler createSSLHandler(SocketChannel ch,
InetSocketAddress address){
- return SslConstructor().context.newHandler(ch.alloc(),
address.getAddress().getHostAddress(),
- address.getPort());
+ private static SslHandler createSSLHandler(SocketChannel ch,
InetSocketAddress address) {
+ return SslConstructor().context.newHandler(ch.alloc(),
address.getAddress().getHostAddress(), address.getPort());
}
public static void resetFederatedSites() {
@@ -313,19 +342,20 @@ public class FederatedData {
public static class FederatedRequestEncoder extends ObjectEncoder {
@Override
- protected ByteBuf allocateBuffer(ChannelHandlerContext ctx,
Serializable msg,
- boolean preferDirect) throws Exception {
+ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx,
Serializable msg, boolean preferDirect)
+ throws Exception {
int initCapacity = 256; // default initial capacity
if(msg instanceof FederatedRequest[]) {
initCapacity = 0;
try {
- for(FederatedRequest fr :
(FederatedRequest[])msg) {
+ for(FederatedRequest fr :
(FederatedRequest[]) msg) {
int frSize =
Math.toIntExact(fr.estimateSerializationBufferSize());
if(Integer.MAX_VALUE -
initCapacity < frSize) // summed sizes exceed integer limits
throw new
ArithmeticException("Overflow.");
initCapacity += frSize;
}
- } catch(ArithmeticException ae) { // size of
federated request exceeds integer limits
+ }
+ catch(ArithmeticException ae) { // size of
federated request exceeds integer limits
initCapacity = Integer.MAX_VALUE;
}
}