This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 477e8e5480c [IOTDB-6248] Fix the connection will close when 
WebSocketConnectorServer doesn't response in Flink-CDC-Connector & Fix multiple 
cdc tasks cannot run properly at the same time (#11598)
477e8e5480c is described below

commit 477e8e5480cbffd9479c7113627288a052e935d2
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Fri Dec 1 17:09:18 2023 +0800

    [IOTDB-6248] Fix the connection will close when WebSocketConnectorServer 
doesn't response in Flink-CDC-Connector & Fix multiple cdc tasks cannot run 
properly at the same time (#11598)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../flink/sql/client/IoTDBWebSocketClient.java     |  20 +-
 .../flink/sql/function/IoTDBCDCSourceFunction.java |  84 ++--
 .../protocol/websocket/WebSocketConnector.java     |  95 ++---
 .../websocket/WebSocketConnectorServer.java        | 467 ++++++++++++++++-----
 4 files changed, 452 insertions(+), 214 deletions(-)

diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
index eb57749bc39..768ba986d73 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
@@ -33,12 +33,23 @@ import java.nio.ByteBuffer;
 public class IoTDBWebSocketClient extends WebSocketClient {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBWebSocketClient.class);
   private final IoTDBCDCSourceFunction function;
+  private transient Status status = Status.WAITING;
 
   public IoTDBWebSocketClient(URI uri, IoTDBCDCSourceFunction function) {
     super(uri);
     this.function = function;
   }
 
+  public Status getStatus() {
+    return status;
+  }
+
+  public enum Status {
+    WAITING,
+    READY,
+    ERROR
+  }
+
   @Override
   public void onOpen(ServerHandshake serverHandshake) {
     String log =
@@ -48,7 +59,11 @@ public class IoTDBWebSocketClient extends WebSocketClient {
 
   @Override
   public void onMessage(String s) {
-    // Do nothing
+    if ("READY".equals(s)) {
+      status = Status.READY;
+    } else if ("ERROR".equals(s)) {
+      status = Status.ERROR;
+    }
   }
 
   @Override
@@ -61,7 +76,8 @@ public class IoTDBWebSocketClient extends WebSocketClient {
 
   @Override
   public void onClose(int i, String s, boolean b) {
-    LOGGER.info("The connection to {}:{} has been closed.", uri.getHost(), 
uri.getPort());
+    LOGGER.info(
+        "The connection to {}:{} has been closed. Because {}", uri.getHost(), 
uri.getPort(), s);
   }
 
   @Override
diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
index 2865170a08b..d9b73f37534 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
@@ -69,6 +69,7 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
   private final List<String> timeseriesList;
   private final BlockingQueue<TabletWrapper> tabletWrappers;
   private final List<Tuple2<String, DataType>> tableSchema;
+  private final String pipeName;
   private transient ExecutorService consumeExecutor;
 
   public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper 
schemaWrapper) {
@@ -77,12 +78,13 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
     cdcPort = options.get(Options.CDC_PORT);
     nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
     taskName = options.get(Options.CDC_TASK_NAME);
+    pipeName = String.format("flink_cdc_%s", taskName);
     user = options.get(Options.USER);
     password = options.get(Options.PASSWORD);
     timeseriesList =
         tableSchema.stream().map(field -> 
String.valueOf(field.f0)).collect(Collectors.toList());
 
-    tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size());
+    tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size() * 10);
   }
 
   @Override
@@ -91,53 +93,38 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
     Session session =
         new 
Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
     session.open(false);
-    String pipeName =
-        String.format("`flink_cdc_%s_%s_%d`", taskName, pattern.replace("`", 
"``"), cdcPort);
-    boolean hasCreatedPipeTask = false;
-    try (SessionDataSet pipes = session.executeQueryStatement("show pipes"); ) 
{
-      while (pipes.hasNext()) {
-        RowRecord pipe = pipes.next();
-        if (pipeName.equals(pipe.getFields().get(0).getStringValue())) {
-          hasCreatedPipeTask = true;
-          break;
-        }
-      }
-    }
-    if (!hasCreatedPipeTask) {
-      for (String nodeUrl : nodeUrls) {
-        URI uri = new URI(String.format("ws://%s:%d", nodeUrl.split(":")[0], 
cdcPort));
-        if (Utils.isURIAvailable(uri)) {
+    try (SessionDataSet dataSet =
+        session.executeQueryStatement(String.format("show pipe %s", 
pipeName))) {
+      if (!dataSet.hasNext()) {
+        String createPipeCommand =
+            String.format(
+                "CREATE PIPE %s\n"
+                    + "WITH EXTRACTOR (\n"
+                    + "'extractor' = 'iotdb-extractor',\n"
+                    + "'extractor.pattern' = '%s',\n"
+                    + ") WITH CONNECTOR (\n"
+                    + "'connector' = 'websocket-connector',\n"
+                    + "'connector.websocket.port' = '%d',\n"
+                    // avoid to reuse the pipe's connector
+                    + "'connector.websocket.id' = '%d'"
+                    + ")",
+                pipeName, pattern, cdcPort, System.currentTimeMillis());
+        session.executeNonQueryStatement(createPipeCommand);
+        session.executeNonQueryStatement(String.format("start pipe %s", 
pipeName));
+      } else {
+        RowRecord pipe = dataSet.next();
+        String pipePattern = 
pipe.getFields().get(3).getStringValue().split(",")[0].split("=")[1];
+        if (!pipePattern.equals(this.pattern)) {
           throw new IllegalOptionException(
               String.format(
-                  "The port `%d` has been bound. Please use another one by 
option `cdc.port`.",
-                  cdcPort));
+                  "The CDC task `%s` has been created by pattern `%s`.",
+                  this.taskName, this.pattern));
         }
-      }
-      String createPipeCommand =
-          String.format(
-              "CREATE PIPE %s\n"
-                  + "WITH EXTRACTOR (\n"
-                  + "'extractor' = 'iotdb-extractor',\n"
-                  + "'extractor.pattern' = '%s',\n"
-                  + ") WITH CONNECTOR (\n"
-                  + "'connector' = 'websocket-connector',\n"
-                  + "'connector.websocket.port' = '%d'"
-                  + ")",
-              pipeName, pattern, cdcPort);
-      session.executeNonQueryStatement(createPipeCommand);
-    }
-    try (SessionDataSet pipes = session.executeQueryStatement("show pipes"); ) 
{
-      String status = null;
-      while (pipes.hasNext()) {
-        RowRecord pipe = pipes.next();
-        if (pipeName.equals(pipe.getFields().get(0).getStringValue())) {
-          status = pipe.getFields().get(2).getStringValue();
-          break;
+        String status = pipe.getFields().get(2).getStringValue();
+        if ("STOPPED".equals(status)) {
+          session.executeNonQueryStatement(String.format("start pipe %s", 
pipeName));
         }
       }
-      if ("STOPPED".equals(status)) {
-        session.executeNonQueryStatement(String.format("start pipe %s", 
pipeName));
-      }
     }
     session.close();
 
@@ -170,7 +157,7 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
           while (!socketClient.getReadyState().equals(ReadyState.OPEN)) {
             Thread.sleep(1000);
           }
-          socketClient.send("START");
+          socketClient.send(String.format("BIND:%s", pipeName));
         } else {
           Thread.sleep(1000);
         }
@@ -211,7 +198,14 @@ public class IoTDBCDCSourceFunction extends 
RichSourceFunction<RowData> {
     while (!client.getReadyState().equals(ReadyState.OPEN)) {
       Thread.sleep(1000);
     }
-    client.send("START");
+    client.send(String.format("BIND:%s", pipeName));
+    while (IoTDBWebSocketClient.Status.WAITING == client.getStatus()) {
+      Thread.sleep(1000);
+    }
+    if (IoTDBWebSocketClient.Status.ERROR == client.getStatus()) {
+      throw new IllegalOptionException(
+          "An exception occurred during binding. The CDC task is running. 
Please stop it first.");
+    }
     return client;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 75bd571f0b6..c93c3956c61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -30,66 +30,67 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.net.InetSocketAddress;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class WebSocketConnector implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketConnector.class);
 
-  private static final Map<Integer, Pair<AtomicInteger, 
WebSocketConnectorServer>>
-      PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
-
   private Integer port;
   private WebSocketConnectorServer server;
+  private String pipeName;
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
-    // Do nothing
-  }
+    final PipeParameters parameters = validator.getParameters();
 
-  @Override
-  public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
-      throws Exception {
     port =
         parameters.getIntOrDefault(
             Arrays.asList(
                 PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
                 PipeConnectorConstant.SINK_WEBSOCKET_PORT_KEY),
             PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
+
+    server = WebSocketConnectorServer.getOrCreateInstance(port);
+    if (server.getPort() != port) {
+      throw new PipeException(
+          String.format(
+              "The websocket server has already been created with port = %d. "
+                  + "Please set the option cdc.port = %d.",
+              server.getPort(), server.getPort()));
+    }
   }
 
   @Override
-  public void handshake() throws Exception {
-    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      server =
-          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
-              .computeIfAbsent(
-                  port,
-                  key -> {
-                    final WebSocketConnectorServer newServer =
-                        new WebSocketConnectorServer(new 
InetSocketAddress(port), this);
-                    newServer.start();
-                    return new Pair<>(new AtomicInteger(0), newServer);
-                  })
-              .getRight();
-      
PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
+  public void customize(
+      PipeParameters parameters, PipeConnectorRuntimeConfiguration 
configuration) {
+    pipeName = configuration.getRuntimeEnvironment().getPipeName();
+  }
+
+  @Override
+  public void handshake() {
+    server = WebSocketConnectorServer.getOrCreateInstance(port);
+    server.register(this);
+
+    if (!server.isStarted()) {
+      synchronized (WebSocketConnectorServer.class) {
+        if (!server.isStarted()) {
+          server.start();
+        }
+      }
     }
   }
 
   @Override
-  public void heartbeat() throws Exception {}
+  public void heartbeat() throws Exception {
+    // Do nothing
+  }
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) {
@@ -105,7 +106,7 @@ public class WebSocketConnector implements PipeConnector {
     ((EnrichedEvent) tabletInsertionEvent)
         .increaseReferenceCount(WebSocketConnector.class.getName());
 
-    server.addEvent(tabletInsertionEvent);
+    server.addEvent(tabletInsertionEvent, this);
   }
 
   @Override
@@ -121,7 +122,7 @@ public class WebSocketConnector implements PipeConnector {
       for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
         ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
 
-        server.addEvent(event);
+        server.addEvent(event, this);
       }
     } finally {
       tsFileInsertionEvent.close();
@@ -129,33 +130,23 @@ public class WebSocketConnector implements PipeConnector {
   }
 
   @Override
-  public void transfer(Event event) throws Exception {}
+  public void transfer(Event event) throws Exception {
+    // Do nothing
+  }
 
   @Override
   public void close() throws Exception {
-    if (port == null) {
-      return;
-    }
-
-    synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
-      final Pair<AtomicInteger, WebSocketConnectorServer> pair =
-          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
-      if (pair == null) {
-        return;
-      }
-
-      if (pair.getLeft().decrementAndGet() <= 0) {
-        try {
-          pair.getRight().stop();
-        } finally {
-          PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
-        }
-      }
+    if (server != null) {
+      server.unregister(this);
     }
   }
 
-  public synchronized void commit(@Nullable EnrichedEvent enrichedEvent) {
+  public void commit(EnrichedEvent enrichedEvent) {
     Optional.ofNullable(enrichedEvent)
         .ifPresent(event -> 
event.decreaseReferenceCount(WebSocketConnector.class.getName(), true));
   }
+
+  public String getPipeName() {
+    return pipeName;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index 3ba0f7feb89..50e075665b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -25,8 +25,9 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.apache.commons.collections4.BidiMap;
+import org.apache.commons.collections4.bidimap.DualTreeBidiMap;
 import org.java_websocket.WebSocket;
 import org.java_websocket.handshake.ClientHandshake;
 import org.java_websocket.server.WebSocketServer;
@@ -38,186 +39,422 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class WebSocketConnectorServer extends WebSocketServer {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebSocketConnectorServer.class);
 
   private final AtomicLong eventIdGenerator = new AtomicLong(0);
-  private final PriorityBlockingQueue<Pair<Long, Event>> 
eventsWaitingForTransfer =
-      new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
-  private final ConcurrentMap<Long, Event> eventsWaitingForAck = new 
ConcurrentHashMap<>();
+  // Map<pipeName, Queue<Tuple<eventId, connector, event>>>
+  private final ConcurrentHashMap<String, 
PriorityBlockingQueue<EventWaitingForTransfer>>
+      eventsWaitingForTransfer = new ConcurrentHashMap<>();
+  // Map<pipeName, Map<eventId, Tuple<connector, event>>>
+  private final ConcurrentHashMap<String, ConcurrentHashMap<Long, 
EventWaitingForAck>>
+      eventsWaitingForAck = new ConcurrentHashMap<>();
 
-  private final WebSocketConnector websocketConnector;
+  private final BidiMap<String, WebSocket> router =
+      new DualTreeBidiMap<String, WebSocket>(null, 
Comparator.comparing(Object::hashCode)) {};
 
-  public WebSocketConnectorServer(
-      InetSocketAddress address, WebSocketConnector websocketConnector) {
-    super(address);
-    this.websocketConnector = websocketConnector;
+  private static final AtomicReference<WebSocketConnectorServer> instance = 
new AtomicReference<>();
+  private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  private WebSocketConnectorServer(int port) {
+    super(new InetSocketAddress(port));
+    new TransferThread(this).start();
+  }
+
+  public static synchronized WebSocketConnectorServer getOrCreateInstance(int 
port) {
+    if (null == instance.get()) {
+      instance.set(new WebSocketConnectorServer(port));
+    }
+    return instance.get();
+  }
+
+  public synchronized void register(WebSocketConnector connector) {
+    eventsWaitingForTransfer.putIfAbsent(
+        connector.getPipeName(),
+        new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.eventId)));
+    eventsWaitingForAck.putIfAbsent(connector.getPipeName(), new 
ConcurrentHashMap<>());
+  }
+
+  public synchronized void unregister(WebSocketConnector connector) {
+    final String pipeName = connector.getPipeName();
+    // close invoked in validation stage
+    if (pipeName == null) {
+      return;
+    }
+    if (eventsWaitingForTransfer.containsKey(pipeName)) {
+      final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+          eventsWaitingForTransfer.remove(pipeName);
+      while (!eventTransferQueue.isEmpty()) {
+        eventTransferQueue.forEach(
+            (eventWrapper) -> {
+              if (eventWrapper.event instanceof EnrichedEvent) {
+                ((EnrichedEvent) eventWrapper.event)
+                    
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
+              }
+            });
+        eventTransferQueue.clear();
+        synchronized (eventTransferQueue) {
+          eventTransferQueue.notifyAll();
+        }
+      }
+    }
+
+    if (eventsWaitingForAck.containsKey(pipeName)) {
+      eventsWaitingForAck
+          .remove(pipeName)
+          .forEach(
+              (eventId, eventWrapper) -> {
+                if (eventWrapper.event instanceof EnrichedEvent) {
+                  ((EnrichedEvent) eventWrapper.event)
+                      
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
+                }
+              });
+    }
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    isStarted.set(true);
+  }
+
+  @Override
+  public void onStart() {
+    LOGGER.info(
+        "The websocket server {}:{} has been started!", 
getAddress().getHostName(), getPort());
+  }
+
+  public boolean isStarted() {
+    return isStarted.get();
   }
 
   @Override
   public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
-    String log =
-        String.format(
-            "The connection from client %s:%d has been opened!",
-            webSocket.getRemoteSocketAddress().getHostName(),
-            webSocket.getRemoteSocketAddress().getPort());
-    LOGGER.info(log);
+    LOGGER.info(
+        "The websocket connection from client {}:{} has been opened!",
+        webSocket.getRemoteSocketAddress().getHostName(),
+        webSocket.getRemoteSocketAddress().getPort());
   }
 
   @Override
-  public void onClose(WebSocket webSocket, int i, String s, boolean b) {
-    String log =
-        String.format(
-            "The client from %s:%d has been closed!",
-            webSocket.getRemoteSocketAddress().getAddress(),
-            webSocket.getRemoteSocketAddress().getPort());
-    LOGGER.info(log);
+  public void onClose(WebSocket webSocket, int code, String reason, boolean 
remote) {
+    if (webSocket.getRemoteSocketAddress() != null) {
+      LOGGER.info(
+          "The websocket connection from client {}:{} has been closed! "
+              + "The code is {}. The reason is {}. Is it closed by remote? {}",
+          webSocket.getRemoteSocketAddress().getHostName(),
+          webSocket.getRemoteSocketAddress().getPort(),
+          code,
+          reason,
+          remote);
+    } else {
+      LOGGER.warn(
+          "The websocket connection from client has been closed!"
+              + "The code is {}. The reason is {}. Is it closed by remote? {}",
+          code,
+          reason,
+          remote);
+    }
+    router.remove(router.getKey(webSocket));
   }
 
   @Override
   public void onMessage(WebSocket webSocket, String s) {
-    if (s.startsWith("START")) {
+    if (s.startsWith("BIND")) {
       LOGGER.info(
-          "Received a start message from {}:{}",
+          "Received a bind message from {}:{}",
           webSocket.getRemoteSocketAddress().getHostName(),
           webSocket.getRemoteSocketAddress().getPort());
-      handleStart(webSocket);
+
+      handleBind(webSocket, s.replace("BIND:", ""));
     } else if (s.startsWith("ACK")) {
-      handleAck(webSocket, s);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Received a ack message from {}:{}",
+            webSocket.getRemoteSocketAddress().getHostName(),
+            webSocket.getRemoteSocketAddress().getPort());
+      }
+
+      handleAck(webSocket, Long.parseLong(s.replace("ACK:", "")));
     } else if (s.startsWith("ERROR")) {
-      LOGGER.error(
+      LOGGER.warn(
           "Received an error message {} from {}:{}",
           s,
           webSocket.getRemoteSocketAddress().getHostName(),
           webSocket.getRemoteSocketAddress().getPort());
-      handleError(webSocket, s);
+
+      handleError(webSocket, Long.parseLong(s.replace("ERROR:", "")));
+    } else {
+      LOGGER.warn(
+          "Received an unknown message {} from {}:{}",
+          s,
+          webSocket.getRemoteSocketAddress().getHostName(),
+          webSocket.getRemoteSocketAddress().getPort());
+    }
+  }
+
+  private void handleBind(WebSocket webSocket, String pipeName) {
+    if (router.containsKey(pipeName)) {
+      broadcast("ERROR", Collections.singletonList(webSocket));
+      webSocket.close(4000, "Too many connections.");
+      return;
+    }
+
+    broadcast("READY", Collections.singletonList(webSocket));
+    router.put(pipeName, webSocket);
+  }
+
+  private void handleAck(WebSocket webSocket, long eventId) {
+    final String pipeName = router.getKey(webSocket);
+    if (pipeName == null) {
+      LOGGER.warn(
+          "The websocket connection from {}:{} has been closed, "
+              + "but the ack message of commitId: {} is received.",
+          webSocket.getRemoteSocketAddress().getHostName(),
+          webSocket.getRemoteSocketAddress().getPort(),
+          eventId);
+      return;
+    }
+
+    final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+        eventsWaitingForAck.get(pipeName);
+    if (eventId2EventMap == null) {
+      LOGGER.warn(
+          "The pipe {} was dropped so the event ack {} will be ignored.", 
pipeName, eventId);
+      return;
+    }
+
+    final EventWaitingForAck eventWrapper = eventId2EventMap.remove(eventId);
+    if (eventWrapper == null) {
+      LOGGER.warn("The event ack {} is not found.", eventId);
+      return;
     }
+
+    eventWrapper.connector.commit(
+        eventWrapper.event instanceof EnrichedEvent ? (EnrichedEvent) 
eventWrapper.event : null);
+  }
+
+  // synchronized with register and unregister to avoid resource leak
+  private synchronized void handleError(WebSocket webSocket, long eventId) {
+    final String pipeName = router.getKey(webSocket);
+    if (pipeName == null) {
+      LOGGER.warn(
+          "The websocket connection from {}:{} has been closed, "
+              + "but the error message of commitId: {} is received.",
+          webSocket.getRemoteSocketAddress().getHostName(),
+          webSocket.getRemoteSocketAddress().getPort(),
+          eventId);
+      return;
+    }
+
+    final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+        eventsWaitingForAck.get(pipeName);
+    final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+        eventsWaitingForTransfer.get(pipeName);
+    if (eventId2EventMap == null || eventTransferQueue == null) {
+      LOGGER.warn(
+          "The pipe {} was dropped so the event in error {} will be ignored.", 
pipeName, eventId);
+      return;
+    }
+
+    final EventWaitingForAck eventWrapper = eventId2EventMap.remove(eventId);
+    if (eventWrapper == null) {
+      LOGGER.warn("The event in error {} is not found.", eventId);
+      return;
+    }
+
+    LOGGER.warn(
+        "The tablet of commitId: {} can't be parsed by client, it will be 
retried later.", eventId);
+    eventTransferQueue.put(
+        new EventWaitingForTransfer(eventId, eventWrapper.connector, 
eventWrapper.event));
   }
 
   @Override
   public void onError(WebSocket webSocket, Exception e) {
-    String log;
     if (webSocket.getRemoteSocketAddress() != null) {
-      log =
-          String.format(
-              "Got an error `%s` from %s:%d",
-              e.getMessage(),
-              webSocket.getLocalSocketAddress().getHostName(),
-              webSocket.getLocalSocketAddress().getPort());
+      LOGGER.warn(
+          "Got an error \"{}\" from {}:{}.",
+          e.getMessage(),
+          webSocket.getLocalSocketAddress().getHostName(),
+          webSocket.getLocalSocketAddress().getPort(),
+          e);
     } else {
-      log = String.format("Got an error `%s` from client", e.getMessage());
+      LOGGER.warn("Got an error \"{}\" from an unknown client.", 
e.getMessage(), e);
+      // if the remote socket address is null, it means the connection is not 
established yet.
+      // we should close the connection manually.
+      router.remove(router.getKey(webSocket));
     }
-    LOGGER.error(log);
   }
 
-  @Override
-  public void onStart() {
-    String log =
-        String.format(
-            "The webSocket server %s:%d has been started!",
-            this.getAddress().getHostName(), this.getPort());
-    LOGGER.info(log);
-  }
+  public void addEvent(Event event, WebSocketConnector connector) {
+    final PriorityBlockingQueue<EventWaitingForTransfer> queue =
+        eventsWaitingForTransfer.get(connector.getPipeName());
+
+    if (queue == null) {
+      LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", 
connector, event);
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) event)
+            .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), 
false);
+      }
+      return;
+    }
 
-  public void addEvent(Event event) {
-    if (eventsWaitingForTransfer.size() >= 5) {
-      synchronized (eventsWaitingForTransfer) {
-        while (eventsWaitingForTransfer.size() >= 5) {
+    if (queue.size() >= 5) {
+      synchronized (queue) {
+        while (queue.size() >= 5) {
           try {
-            eventsWaitingForTransfer.wait();
+            queue.wait();
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PipeException(e.getMessage());
           }
         }
+
+        queue.put(
+            new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
+        return;
       }
     }
 
-    eventsWaitingForTransfer.put(new 
Pair<>(eventIdGenerator.incrementAndGet(), event));
+    queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), 
connector, event));
   }
 
-  private void handleStart(WebSocket webSocket) {
-    try {
+  private class TransferThread extends Thread {
+
+    private final WebSocketConnectorServer server;
+
+    public TransferThread(WebSocketConnectorServer server) {
+      this.server = server;
+    }
+
+    @Override
+    public void run() {
       while (true) {
-        Pair<Long, Event> eventPair = eventsWaitingForTransfer.take();
-        synchronized (eventsWaitingForTransfer) {
-          eventsWaitingForTransfer.notifyAll();
+        if (sleepIfNecessary()) {
+          continue;
         }
-        boolean transferred = transfer(eventPair, webSocket);
-        if (transferred) {
-          break;
-        } else {
-          websocketConnector.commit(
-              eventPair.getRight() instanceof EnrichedEvent
-                  ? (EnrichedEvent) eventPair.getRight()
-                  : null);
+
+        for (final String pipeName : eventsWaitingForTransfer.keySet()) {
+          final PriorityBlockingQueue<EventWaitingForTransfer> queue =
+              eventsWaitingForTransfer.getOrDefault(pipeName, null);
+          if (queue == null || queue.isEmpty() || 
!router.containsKey(pipeName)) {
+            continue;
+          }
+
+          try {
+            final EventWaitingForTransfer queueElement = queue.take();
+            synchronized (queue) {
+              queue.notifyAll();
+            }
+            transfer(pipeName, queueElement);
+          } catch (InterruptedException e) {
+            LOGGER.warn("The transfer thread is interrupted.", e);
+            Thread.currentThread().interrupt();
+          }
         }
       }
-    } catch (InterruptedException e) {
-      String log = String.format("The event can't be taken, because: %s", 
e.getMessage());
-      LOGGER.warn(log);
-      Thread.currentThread().interrupt();
-      throw new PipeException(e.getMessage());
-    }
-  }
-
-  private boolean transfer(Pair<Long, Event> eventPair, WebSocket webSocket) {
-    Long commitId = eventPair.getLeft();
-    Event event = eventPair.getRight();
-    try {
-      ByteBuffer tabletBuffer = null;
-      if (event instanceof PipeInsertNodeTabletInsertionEvent) {
-        tabletBuffer = ((PipeInsertNodeTabletInsertionEvent) 
event).convertToTablet().serialize();
-      } else if (event instanceof PipeRawTabletInsertionEvent) {
-        tabletBuffer = ((PipeRawTabletInsertionEvent) 
event).convertToTablet().serialize();
-      } else {
-        throw new NotImplementedException(
-            "IoTDBCDCConnector only support "
-                + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent.");
+    }
+
+    private void transfer(String pipeName, EventWaitingForTransfer element) {
+      final Long eventId = element.eventId;
+      final Event event = element.event;
+      final WebSocketConnector connector = element.connector;
+
+      try {
+        ByteBuffer tabletBuffer;
+        if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+          tabletBuffer = ((PipeInsertNodeTabletInsertionEvent) 
event).convertToTablet().serialize();
+        } else if (event instanceof PipeRawTabletInsertionEvent) {
+          tabletBuffer = ((PipeRawTabletInsertionEvent) 
event).convertToTablet().serialize();
+        } else {
+          throw new NotImplementedException(
+              "IoTDBCDCConnector only support "
+                  + "PipeInsertNodeTabletInsertionEvent and 
PipeRawTabletInsertionEvent.");
+        }
+
+        if (tabletBuffer == null) {
+          connector.commit((EnrichedEvent) event);
+          return;
+        }
+
+        final ByteBuffer payload = ByteBuffer.allocate(Long.BYTES + 
tabletBuffer.limit());
+        payload.putLong(eventId);
+        payload.put(tabletBuffer);
+        payload.flip();
+
+        server.broadcast(payload, 
Collections.singletonList(router.get(pipeName)));
+
+        final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+            eventsWaitingForAck.get(pipeName);
+        if (eventId2EventMap == null) {
+          LOGGER.warn(
+              "The pipe {} was dropped so the event ack {} will be ignored.", 
pipeName, eventId);
+          return;
+        }
+        eventId2EventMap.put(eventId, new EventWaitingForAck(connector, 
event));
+      } catch (Exception e) {
+        synchronized (server) {
+          final PriorityBlockingQueue<EventWaitingForTransfer> queue =
+              eventsWaitingForTransfer.get(pipeName);
+          if (queue == null) {
+            LOGGER.warn(
+                "The pipe {} was dropped so the event {} will be dropped.", 
pipeName, eventId);
+            if (event instanceof EnrichedEvent) {
+              ((EnrichedEvent) event)
+                  
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
+            }
+            return;
+          }
+
+          LOGGER.warn(
+              "The event {} can't be transferred to client, it will be retried 
later.", eventId, e);
+          queue.put(new EventWaitingForTransfer(eventId, connector, event));
+        }
       }
-      if (tabletBuffer == null) {
+    }
+
+    private boolean sleepIfNecessary() {
+      if (!eventsWaitingForTransfer.isEmpty()) {
         return false;
       }
 
-      ByteBuffer payload = ByteBuffer.allocate(Long.BYTES + 
tabletBuffer.limit());
-      payload.putLong(commitId);
-      payload.put(tabletBuffer);
-      payload.flip();
-
-      this.broadcast(payload, Collections.singletonList(webSocket));
-      eventsWaitingForAck.put(eventPair.getLeft(), eventPair.getRight());
-    } catch (Exception e) {
-      eventsWaitingForTransfer.put(eventPair);
-      throw new PipeException(e.getMessage());
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+        LOGGER.warn("The transfer thread is interrupted.", e);
+        Thread.currentThread().interrupt();
+      }
+      return true;
     }
-    return true;
   }
 
-  private void handleAck(WebSocket webSocket, String s) {
-    long commitId = Long.parseLong(s.replace("ACK:", ""));
-    Event event = eventsWaitingForAck.remove(commitId);
-    if (event != null) {
-      websocketConnector.commit(event instanceof EnrichedEvent ? 
(EnrichedEvent) event : null);
+  private static class EventWaitingForTransfer {
+
+    private final Long eventId;
+    private final WebSocketConnector connector;
+    private final Event event;
+
+    public EventWaitingForTransfer(Long eventId, WebSocketConnector connector, 
Event event) {
+      this.eventId = eventId;
+      this.connector = connector;
+      this.event = event;
     }
-    handleStart(webSocket);
   }
 
-  private void handleError(WebSocket webSocket, String s) {
-    long commitId = Long.parseLong(s.replace("ERROR:", ""));
-    String log =
-        String.format(
-            "The tablet of commitId: %d can't be parsed by client, it will be 
retried later.",
-            commitId);
-    LOGGER.warn(log);
-    Event event = eventsWaitingForAck.remove(commitId);
-    if (event != null) {
-      eventsWaitingForTransfer.put(new Pair<>(commitId, event));
+  private static class EventWaitingForAck {
+
+    private final WebSocketConnector connector;
+    private final Event event;
+
+    public EventWaitingForAck(WebSocketConnector connector, Event event) {
+      this.connector = connector;
+      this.event = event;
     }
-    handleStart(webSocket);
   }
 }


Reply via email to