This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 477e8e5480c [IOTDB-6248] Fix the connection will close when
WebSocketConnectorServer doesn't response in Flink-CDC-Connector & Fix multiple
cdc tasks cannot run properly at the same time (#11598)
477e8e5480c is described below
commit 477e8e5480cbffd9479c7113627288a052e935d2
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Fri Dec 1 17:09:18 2023 +0800
[IOTDB-6248] Fix the connection will close when WebSocketConnectorServer
doesn't response in Flink-CDC-Connector & Fix multiple cdc tasks cannot run
properly at the same time (#11598)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../flink/sql/client/IoTDBWebSocketClient.java | 20 +-
.../flink/sql/function/IoTDBCDCSourceFunction.java | 84 ++--
.../protocol/websocket/WebSocketConnector.java | 95 ++---
.../websocket/WebSocketConnectorServer.java | 467 ++++++++++++++++-----
4 files changed, 452 insertions(+), 214 deletions(-)
diff --git
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
index eb57749bc39..768ba986d73 100644
---
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
+++
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/client/IoTDBWebSocketClient.java
@@ -33,12 +33,23 @@ import java.nio.ByteBuffer;
public class IoTDBWebSocketClient extends WebSocketClient {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBWebSocketClient.class);
private final IoTDBCDCSourceFunction function;
+ private transient Status status = Status.WAITING;
public IoTDBWebSocketClient(URI uri, IoTDBCDCSourceFunction function) {
super(uri);
this.function = function;
}
+ public Status getStatus() {
+ return status;
+ }
+
+ public enum Status {
+ WAITING,
+ READY,
+ ERROR
+ }
+
@Override
public void onOpen(ServerHandshake serverHandshake) {
String log =
@@ -48,7 +59,11 @@ public class IoTDBWebSocketClient extends WebSocketClient {
@Override
public void onMessage(String s) {
- // Do nothing
+ if ("READY".equals(s)) {
+ status = Status.READY;
+ } else if ("ERROR".equals(s)) {
+ status = Status.ERROR;
+ }
}
@Override
@@ -61,7 +76,8 @@ public class IoTDBWebSocketClient extends WebSocketClient {
@Override
public void onClose(int i, String s, boolean b) {
- LOGGER.info("The connection to {}:{} has been closed.", uri.getHost(),
uri.getPort());
+ LOGGER.info(
+ "The connection to {}:{} has been closed. Because {}", uri.getHost(),
uri.getPort(), s);
}
@Override
diff --git
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
index 2865170a08b..d9b73f37534 100644
---
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
+++
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBCDCSourceFunction.java
@@ -69,6 +69,7 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
private final List<String> timeseriesList;
private final BlockingQueue<TabletWrapper> tabletWrappers;
private final List<Tuple2<String, DataType>> tableSchema;
+ private final String pipeName;
private transient ExecutorService consumeExecutor;
public IoTDBCDCSourceFunction(ReadableConfig options, SchemaWrapper
schemaWrapper) {
@@ -77,12 +78,13 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
cdcPort = options.get(Options.CDC_PORT);
nodeUrls = Arrays.asList(options.get(Options.NODE_URLS).split(","));
taskName = options.get(Options.CDC_TASK_NAME);
+ pipeName = String.format("flink_cdc_%s", taskName);
user = options.get(Options.USER);
password = options.get(Options.PASSWORD);
timeseriesList =
tableSchema.stream().map(field ->
String.valueOf(field.f0)).collect(Collectors.toList());
- tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size());
+ tabletWrappers = new ArrayBlockingQueue<>(nodeUrls.size() * 10);
}
@Override
@@ -91,53 +93,38 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
Session session =
new
Session.Builder().username(user).password(password).nodeUrls(nodeUrls).build();
session.open(false);
- String pipeName =
- String.format("`flink_cdc_%s_%s_%d`", taskName, pattern.replace("`",
"``"), cdcPort);
- boolean hasCreatedPipeTask = false;
- try (SessionDataSet pipes = session.executeQueryStatement("show pipes"); )
{
- while (pipes.hasNext()) {
- RowRecord pipe = pipes.next();
- if (pipeName.equals(pipe.getFields().get(0).getStringValue())) {
- hasCreatedPipeTask = true;
- break;
- }
- }
- }
- if (!hasCreatedPipeTask) {
- for (String nodeUrl : nodeUrls) {
- URI uri = new URI(String.format("ws://%s:%d", nodeUrl.split(":")[0],
cdcPort));
- if (Utils.isURIAvailable(uri)) {
+ try (SessionDataSet dataSet =
+ session.executeQueryStatement(String.format("show pipe %s",
pipeName))) {
+ if (!dataSet.hasNext()) {
+ String createPipeCommand =
+ String.format(
+ "CREATE PIPE %s\n"
+ + "WITH EXTRACTOR (\n"
+ + "'extractor' = 'iotdb-extractor',\n"
+ + "'extractor.pattern' = '%s',\n"
+ + ") WITH CONNECTOR (\n"
+ + "'connector' = 'websocket-connector',\n"
+ + "'connector.websocket.port' = '%d',\n"
+ // avoid to reuse the pipe's connector
+ + "'connector.websocket.id' = '%d'"
+ + ")",
+ pipeName, pattern, cdcPort, System.currentTimeMillis());
+ session.executeNonQueryStatement(createPipeCommand);
+ session.executeNonQueryStatement(String.format("start pipe %s",
pipeName));
+ } else {
+ RowRecord pipe = dataSet.next();
+ String pipePattern =
pipe.getFields().get(3).getStringValue().split(",")[0].split("=")[1];
+ if (!pipePattern.equals(this.pattern)) {
throw new IllegalOptionException(
String.format(
- "The port `%d` has been bound. Please use another one by
option `cdc.port`.",
- cdcPort));
+ "The CDC task `%s` has been created by pattern `%s`.",
+ this.taskName, this.pattern));
}
- }
- String createPipeCommand =
- String.format(
- "CREATE PIPE %s\n"
- + "WITH EXTRACTOR (\n"
- + "'extractor' = 'iotdb-extractor',\n"
- + "'extractor.pattern' = '%s',\n"
- + ") WITH CONNECTOR (\n"
- + "'connector' = 'websocket-connector',\n"
- + "'connector.websocket.port' = '%d'"
- + ")",
- pipeName, pattern, cdcPort);
- session.executeNonQueryStatement(createPipeCommand);
- }
- try (SessionDataSet pipes = session.executeQueryStatement("show pipes"); )
{
- String status = null;
- while (pipes.hasNext()) {
- RowRecord pipe = pipes.next();
- if (pipeName.equals(pipe.getFields().get(0).getStringValue())) {
- status = pipe.getFields().get(2).getStringValue();
- break;
+ String status = pipe.getFields().get(2).getStringValue();
+ if ("STOPPED".equals(status)) {
+ session.executeNonQueryStatement(String.format("start pipe %s",
pipeName));
}
}
- if ("STOPPED".equals(status)) {
- session.executeNonQueryStatement(String.format("start pipe %s",
pipeName));
- }
}
session.close();
@@ -170,7 +157,7 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
while (!socketClient.getReadyState().equals(ReadyState.OPEN)) {
Thread.sleep(1000);
}
- socketClient.send("START");
+ socketClient.send(String.format("BIND:%s", pipeName));
} else {
Thread.sleep(1000);
}
@@ -211,7 +198,14 @@ public class IoTDBCDCSourceFunction extends
RichSourceFunction<RowData> {
while (!client.getReadyState().equals(ReadyState.OPEN)) {
Thread.sleep(1000);
}
- client.send("START");
+ client.send(String.format("BIND:%s", pipeName));
+ while (IoTDBWebSocketClient.Status.WAITING == client.getStatus()) {
+ Thread.sleep(1000);
+ }
+ if (IoTDBWebSocketClient.Status.ERROR == client.getStatus()) {
+ throw new IllegalOptionException(
+ "An exception occurred during binding. The CDC task is running.
Please stop it first.");
+ }
return client;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index 75bd571f0b6..c93c3956c61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -30,66 +30,67 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
-import java.net.InetSocketAddress;
import java.util.Arrays;
-import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
public class WebSocketConnector implements PipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketConnector.class);
- private static final Map<Integer, Pair<AtomicInteger,
WebSocketConnectorServer>>
- PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP = new ConcurrentHashMap<>();
-
private Integer port;
private WebSocketConnectorServer server;
+ private String pipeName;
@Override
public void validate(PipeParameterValidator validator) throws Exception {
- // Do nothing
- }
+ final PipeParameters parameters = validator.getParameters();
- @Override
- public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfiguration configuration)
- throws Exception {
port =
parameters.getIntOrDefault(
Arrays.asList(
PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_KEY,
PipeConnectorConstant.SINK_WEBSOCKET_PORT_KEY),
PipeConnectorConstant.CONNECTOR_WEBSOCKET_PORT_DEFAULT_VALUE);
+
+ server = WebSocketConnectorServer.getOrCreateInstance(port);
+ if (server.getPort() != port) {
+ throw new PipeException(
+ String.format(
+ "The websocket server has already been created with port = %d. "
+ + "Please set the option cdc.port = %d.",
+ server.getPort(), server.getPort()));
+ }
}
@Override
- public void handshake() throws Exception {
- synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
- server =
- PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP
- .computeIfAbsent(
- port,
- key -> {
- final WebSocketConnectorServer newServer =
- new WebSocketConnectorServer(new
InetSocketAddress(port), this);
- newServer.start();
- return new Pair<>(new AtomicInteger(0), newServer);
- })
- .getRight();
-
PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port).getLeft().incrementAndGet();
+ public void customize(
+ PipeParameters parameters, PipeConnectorRuntimeConfiguration
configuration) {
+ pipeName = configuration.getRuntimeEnvironment().getPipeName();
+ }
+
+ @Override
+ public void handshake() {
+ server = WebSocketConnectorServer.getOrCreateInstance(port);
+ server.register(this);
+
+ if (!server.isStarted()) {
+ synchronized (WebSocketConnectorServer.class) {
+ if (!server.isStarted()) {
+ server.start();
+ }
+ }
}
}
@Override
- public void heartbeat() throws Exception {}
+ public void heartbeat() throws Exception {
+ // Do nothing
+ }
@Override
public void transfer(TabletInsertionEvent tabletInsertionEvent) {
@@ -105,7 +106,7 @@ public class WebSocketConnector implements PipeConnector {
((EnrichedEvent) tabletInsertionEvent)
.increaseReferenceCount(WebSocketConnector.class.getName());
- server.addEvent(tabletInsertionEvent);
+ server.addEvent(tabletInsertionEvent, this);
}
@Override
@@ -121,7 +122,7 @@ public class WebSocketConnector implements PipeConnector {
for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
((EnrichedEvent)
event).increaseReferenceCount(WebSocketConnector.class.getName());
- server.addEvent(event);
+ server.addEvent(event, this);
}
} finally {
tsFileInsertionEvent.close();
@@ -129,33 +130,23 @@ public class WebSocketConnector implements PipeConnector {
}
@Override
- public void transfer(Event event) throws Exception {}
+ public void transfer(Event event) throws Exception {
+ // Do nothing
+ }
@Override
public void close() throws Exception {
- if (port == null) {
- return;
- }
-
- synchronized (PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP) {
- final Pair<AtomicInteger, WebSocketConnectorServer> pair =
- PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.get(port);
- if (pair == null) {
- return;
- }
-
- if (pair.getLeft().decrementAndGet() <= 0) {
- try {
- pair.getRight().stop();
- } finally {
- PORT_TO_REFERENCE_COUNT_AND_SERVER_MAP.remove(port);
- }
- }
+ if (server != null) {
+ server.unregister(this);
}
}
- public synchronized void commit(@Nullable EnrichedEvent enrichedEvent) {
+ public void commit(EnrichedEvent enrichedEvent) {
Optional.ofNullable(enrichedEvent)
.ifPresent(event ->
event.decreaseReferenceCount(WebSocketConnector.class.getName(), true));
}
+
+ public String getPipeName() {
+ return pipeName;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
index 3ba0f7feb89..50e075665b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnectorServer.java
@@ -25,8 +25,9 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.commons.collections4.BidiMap;
+import org.apache.commons.collections4.bidimap.DualTreeBidiMap;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
@@ -38,186 +39,422 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class WebSocketConnectorServer extends WebSocketServer {
private static final Logger LOGGER =
LoggerFactory.getLogger(WebSocketConnectorServer.class);
private final AtomicLong eventIdGenerator = new AtomicLong(0);
- private final PriorityBlockingQueue<Pair<Long, Event>>
eventsWaitingForTransfer =
- new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.left));
- private final ConcurrentMap<Long, Event> eventsWaitingForAck = new
ConcurrentHashMap<>();
+ // Map<pipeName, Queue<Tuple<eventId, connector, event>>>
+ private final ConcurrentHashMap<String,
PriorityBlockingQueue<EventWaitingForTransfer>>
+ eventsWaitingForTransfer = new ConcurrentHashMap<>();
+ // Map<pipeName, Map<eventId, Tuple<connector, event>>>
+ private final ConcurrentHashMap<String, ConcurrentHashMap<Long,
EventWaitingForAck>>
+ eventsWaitingForAck = new ConcurrentHashMap<>();
- private final WebSocketConnector websocketConnector;
+ private final BidiMap<String, WebSocket> router =
+ new DualTreeBidiMap<String, WebSocket>(null,
Comparator.comparing(Object::hashCode)) {};
- public WebSocketConnectorServer(
- InetSocketAddress address, WebSocketConnector websocketConnector) {
- super(address);
- this.websocketConnector = websocketConnector;
+ private static final AtomicReference<WebSocketConnectorServer> instance =
new AtomicReference<>();
+ private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ private WebSocketConnectorServer(int port) {
+ super(new InetSocketAddress(port));
+ new TransferThread(this).start();
+ }
+
+ public static synchronized WebSocketConnectorServer getOrCreateInstance(int
port) {
+ if (null == instance.get()) {
+ instance.set(new WebSocketConnectorServer(port));
+ }
+ return instance.get();
+ }
+
+ public synchronized void register(WebSocketConnector connector) {
+ eventsWaitingForTransfer.putIfAbsent(
+ connector.getPipeName(),
+ new PriorityBlockingQueue<>(11, Comparator.comparing(o -> o.eventId)));
+ eventsWaitingForAck.putIfAbsent(connector.getPipeName(), new
ConcurrentHashMap<>());
+ }
+
+ public synchronized void unregister(WebSocketConnector connector) {
+ final String pipeName = connector.getPipeName();
+ // close invoked in validation stage
+ if (pipeName == null) {
+ return;
+ }
+ if (eventsWaitingForTransfer.containsKey(pipeName)) {
+ final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+ eventsWaitingForTransfer.remove(pipeName);
+ while (!eventTransferQueue.isEmpty()) {
+ eventTransferQueue.forEach(
+ (eventWrapper) -> {
+ if (eventWrapper.event instanceof EnrichedEvent) {
+ ((EnrichedEvent) eventWrapper.event)
+
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
+ }
+ });
+ eventTransferQueue.clear();
+ synchronized (eventTransferQueue) {
+ eventTransferQueue.notifyAll();
+ }
+ }
+ }
+
+ if (eventsWaitingForAck.containsKey(pipeName)) {
+ eventsWaitingForAck
+ .remove(pipeName)
+ .forEach(
+ (eventId, eventWrapper) -> {
+ if (eventWrapper.event instanceof EnrichedEvent) {
+ ((EnrichedEvent) eventWrapper.event)
+
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ isStarted.set(true);
+ }
+
+ @Override
+ public void onStart() {
+ LOGGER.info(
+ "The websocket server {}:{} has been started!",
getAddress().getHostName(), getPort());
+ }
+
+ public boolean isStarted() {
+ return isStarted.get();
}
@Override
public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
- String log =
- String.format(
- "The connection from client %s:%d has been opened!",
- webSocket.getRemoteSocketAddress().getHostName(),
- webSocket.getRemoteSocketAddress().getPort());
- LOGGER.info(log);
+ LOGGER.info(
+ "The websocket connection from client {}:{} has been opened!",
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort());
}
@Override
- public void onClose(WebSocket webSocket, int i, String s, boolean b) {
- String log =
- String.format(
- "The client from %s:%d has been closed!",
- webSocket.getRemoteSocketAddress().getAddress(),
- webSocket.getRemoteSocketAddress().getPort());
- LOGGER.info(log);
+ public void onClose(WebSocket webSocket, int code, String reason, boolean
remote) {
+ if (webSocket.getRemoteSocketAddress() != null) {
+ LOGGER.info(
+ "The websocket connection from client {}:{} has been closed! "
+ + "The code is {}. The reason is {}. Is it closed by remote? {}",
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort(),
+ code,
+ reason,
+ remote);
+ } else {
+ LOGGER.warn(
+ "The websocket connection from client has been closed!"
+ + "The code is {}. The reason is {}. Is it closed by remote? {}",
+ code,
+ reason,
+ remote);
+ }
+ router.remove(router.getKey(webSocket));
}
@Override
public void onMessage(WebSocket webSocket, String s) {
- if (s.startsWith("START")) {
+ if (s.startsWith("BIND")) {
LOGGER.info(
- "Received a start message from {}:{}",
+ "Received a bind message from {}:{}",
webSocket.getRemoteSocketAddress().getHostName(),
webSocket.getRemoteSocketAddress().getPort());
- handleStart(webSocket);
+
+ handleBind(webSocket, s.replace("BIND:", ""));
} else if (s.startsWith("ACK")) {
- handleAck(webSocket, s);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Received a ack message from {}:{}",
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort());
+ }
+
+ handleAck(webSocket, Long.parseLong(s.replace("ACK:", "")));
} else if (s.startsWith("ERROR")) {
- LOGGER.error(
+ LOGGER.warn(
"Received an error message {} from {}:{}",
s,
webSocket.getRemoteSocketAddress().getHostName(),
webSocket.getRemoteSocketAddress().getPort());
- handleError(webSocket, s);
+
+ handleError(webSocket, Long.parseLong(s.replace("ERROR:", "")));
+ } else {
+ LOGGER.warn(
+ "Received an unknown message {} from {}:{}",
+ s,
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort());
+ }
+ }
+
+ private void handleBind(WebSocket webSocket, String pipeName) {
+ if (router.containsKey(pipeName)) {
+ broadcast("ERROR", Collections.singletonList(webSocket));
+ webSocket.close(4000, "Too many connections.");
+ return;
+ }
+
+ broadcast("READY", Collections.singletonList(webSocket));
+ router.put(pipeName, webSocket);
+ }
+
+ private void handleAck(WebSocket webSocket, long eventId) {
+ final String pipeName = router.getKey(webSocket);
+ if (pipeName == null) {
+ LOGGER.warn(
+ "The websocket connection from {}:{} has been closed, "
+ + "but the ack message of commitId: {} is received.",
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort(),
+ eventId);
+ return;
+ }
+
+ final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+ eventsWaitingForAck.get(pipeName);
+ if (eventId2EventMap == null) {
+ LOGGER.warn(
+ "The pipe {} was dropped so the event ack {} will be ignored.",
pipeName, eventId);
+ return;
+ }
+
+ final EventWaitingForAck eventWrapper = eventId2EventMap.remove(eventId);
+ if (eventWrapper == null) {
+ LOGGER.warn("The event ack {} is not found.", eventId);
+ return;
}
+
+ eventWrapper.connector.commit(
+ eventWrapper.event instanceof EnrichedEvent ? (EnrichedEvent)
eventWrapper.event : null);
+ }
+
+ // synchronized with register and unregister to avoid resource leak
+ private synchronized void handleError(WebSocket webSocket, long eventId) {
+ final String pipeName = router.getKey(webSocket);
+ if (pipeName == null) {
+ LOGGER.warn(
+ "The websocket connection from {}:{} has been closed, "
+ + "but the error message of commitId: {} is received.",
+ webSocket.getRemoteSocketAddress().getHostName(),
+ webSocket.getRemoteSocketAddress().getPort(),
+ eventId);
+ return;
+ }
+
+ final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+ eventsWaitingForAck.get(pipeName);
+ final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
+ eventsWaitingForTransfer.get(pipeName);
+ if (eventId2EventMap == null || eventTransferQueue == null) {
+ LOGGER.warn(
+ "The pipe {} was dropped so the event in error {} will be ignored.",
pipeName, eventId);
+ return;
+ }
+
+ final EventWaitingForAck eventWrapper = eventId2EventMap.remove(eventId);
+ if (eventWrapper == null) {
+ LOGGER.warn("The event in error {} is not found.", eventId);
+ return;
+ }
+
+ LOGGER.warn(
+ "The tablet of commitId: {} can't be parsed by client, it will be
retried later.", eventId);
+ eventTransferQueue.put(
+ new EventWaitingForTransfer(eventId, eventWrapper.connector,
eventWrapper.event));
}
@Override
public void onError(WebSocket webSocket, Exception e) {
- String log;
if (webSocket.getRemoteSocketAddress() != null) {
- log =
- String.format(
- "Got an error `%s` from %s:%d",
- e.getMessage(),
- webSocket.getLocalSocketAddress().getHostName(),
- webSocket.getLocalSocketAddress().getPort());
+ LOGGER.warn(
+ "Got an error \"{}\" from {}:{}.",
+ e.getMessage(),
+ webSocket.getLocalSocketAddress().getHostName(),
+ webSocket.getLocalSocketAddress().getPort(),
+ e);
} else {
- log = String.format("Got an error `%s` from client", e.getMessage());
+ LOGGER.warn("Got an error \"{}\" from an unknown client.",
e.getMessage(), e);
+ // if the remote socket address is null, it means the connection is not
established yet.
+ // we should close the connection manually.
+ router.remove(router.getKey(webSocket));
}
- LOGGER.error(log);
}
- @Override
- public void onStart() {
- String log =
- String.format(
- "The webSocket server %s:%d has been started!",
- this.getAddress().getHostName(), this.getPort());
- LOGGER.info(log);
- }
+ public void addEvent(Event event, WebSocketConnector connector) {
+ final PriorityBlockingQueue<EventWaitingForTransfer> queue =
+ eventsWaitingForTransfer.get(connector.getPipeName());
+
+ if (queue == null) {
+ LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.",
connector, event);
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+ .decreaseReferenceCount(WebSocketConnectorServer.class.getName(),
false);
+ }
+ return;
+ }
- public void addEvent(Event event) {
- if (eventsWaitingForTransfer.size() >= 5) {
- synchronized (eventsWaitingForTransfer) {
- while (eventsWaitingForTransfer.size() >= 5) {
+ if (queue.size() >= 5) {
+ synchronized (queue) {
+ while (queue.size() >= 5) {
try {
- eventsWaitingForTransfer.wait();
+ queue.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PipeException(e.getMessage());
}
}
+
+ queue.put(
+ new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(),
connector, event));
+ return;
}
}
- eventsWaitingForTransfer.put(new
Pair<>(eventIdGenerator.incrementAndGet(), event));
+ queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(),
connector, event));
}
- private void handleStart(WebSocket webSocket) {
- try {
+ private class TransferThread extends Thread {
+
+ private final WebSocketConnectorServer server;
+
+ public TransferThread(WebSocketConnectorServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
while (true) {
- Pair<Long, Event> eventPair = eventsWaitingForTransfer.take();
- synchronized (eventsWaitingForTransfer) {
- eventsWaitingForTransfer.notifyAll();
+ if (sleepIfNecessary()) {
+ continue;
}
- boolean transferred = transfer(eventPair, webSocket);
- if (transferred) {
- break;
- } else {
- websocketConnector.commit(
- eventPair.getRight() instanceof EnrichedEvent
- ? (EnrichedEvent) eventPair.getRight()
- : null);
+
+ for (final String pipeName : eventsWaitingForTransfer.keySet()) {
+ final PriorityBlockingQueue<EventWaitingForTransfer> queue =
+ eventsWaitingForTransfer.getOrDefault(pipeName, null);
+ if (queue == null || queue.isEmpty() ||
!router.containsKey(pipeName)) {
+ continue;
+ }
+
+ try {
+ final EventWaitingForTransfer queueElement = queue.take();
+ synchronized (queue) {
+ queue.notifyAll();
+ }
+ transfer(pipeName, queueElement);
+ } catch (InterruptedException e) {
+ LOGGER.warn("The transfer thread is interrupted.", e);
+ Thread.currentThread().interrupt();
+ }
}
}
- } catch (InterruptedException e) {
- String log = String.format("The event can't be taken, because: %s",
e.getMessage());
- LOGGER.warn(log);
- Thread.currentThread().interrupt();
- throw new PipeException(e.getMessage());
- }
- }
-
- private boolean transfer(Pair<Long, Event> eventPair, WebSocket webSocket) {
- Long commitId = eventPair.getLeft();
- Event event = eventPair.getRight();
- try {
- ByteBuffer tabletBuffer = null;
- if (event instanceof PipeInsertNodeTabletInsertionEvent) {
- tabletBuffer = ((PipeInsertNodeTabletInsertionEvent)
event).convertToTablet().serialize();
- } else if (event instanceof PipeRawTabletInsertionEvent) {
- tabletBuffer = ((PipeRawTabletInsertionEvent)
event).convertToTablet().serialize();
- } else {
- throw new NotImplementedException(
- "IoTDBCDCConnector only support "
- + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent.");
+ }
+
+ private void transfer(String pipeName, EventWaitingForTransfer element) {
+ final Long eventId = element.eventId;
+ final Event event = element.event;
+ final WebSocketConnector connector = element.connector;
+
+ try {
+ ByteBuffer tabletBuffer;
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ tabletBuffer = ((PipeInsertNodeTabletInsertionEvent)
event).convertToTablet().serialize();
+ } else if (event instanceof PipeRawTabletInsertionEvent) {
+ tabletBuffer = ((PipeRawTabletInsertionEvent)
event).convertToTablet().serialize();
+ } else {
+ throw new NotImplementedException(
+ "IoTDBCDCConnector only support "
+ + "PipeInsertNodeTabletInsertionEvent and
PipeRawTabletInsertionEvent.");
+ }
+
+ if (tabletBuffer == null) {
+ connector.commit((EnrichedEvent) event);
+ return;
+ }
+
+ final ByteBuffer payload = ByteBuffer.allocate(Long.BYTES +
tabletBuffer.limit());
+ payload.putLong(eventId);
+ payload.put(tabletBuffer);
+ payload.flip();
+
+ server.broadcast(payload,
Collections.singletonList(router.get(pipeName)));
+
+ final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
+ eventsWaitingForAck.get(pipeName);
+ if (eventId2EventMap == null) {
+ LOGGER.warn(
+ "The pipe {} was dropped so the event ack {} will be ignored.",
pipeName, eventId);
+ return;
+ }
+ eventId2EventMap.put(eventId, new EventWaitingForAck(connector,
event));
+ } catch (Exception e) {
+ synchronized (server) {
+ final PriorityBlockingQueue<EventWaitingForTransfer> queue =
+ eventsWaitingForTransfer.get(pipeName);
+ if (queue == null) {
+ LOGGER.warn(
+ "The pipe {} was dropped so the event {} will be dropped.",
pipeName, eventId);
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+
.decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false);
+ }
+ return;
+ }
+
+ LOGGER.warn(
+ "The event {} can't be transferred to client, it will be retried
later.", eventId, e);
+ queue.put(new EventWaitingForTransfer(eventId, connector, event));
+ }
}
- if (tabletBuffer == null) {
+ }
+
+ private boolean sleepIfNecessary() {
+ if (!eventsWaitingForTransfer.isEmpty()) {
return false;
}
- ByteBuffer payload = ByteBuffer.allocate(Long.BYTES +
tabletBuffer.limit());
- payload.putLong(commitId);
- payload.put(tabletBuffer);
- payload.flip();
-
- this.broadcast(payload, Collections.singletonList(webSocket));
- eventsWaitingForAck.put(eventPair.getLeft(), eventPair.getRight());
- } catch (Exception e) {
- eventsWaitingForTransfer.put(eventPair);
- throw new PipeException(e.getMessage());
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOGGER.warn("The transfer thread is interrupted.", e);
+ Thread.currentThread().interrupt();
+ }
+ return true;
}
- return true;
}
- private void handleAck(WebSocket webSocket, String s) {
- long commitId = Long.parseLong(s.replace("ACK:", ""));
- Event event = eventsWaitingForAck.remove(commitId);
- if (event != null) {
- websocketConnector.commit(event instanceof EnrichedEvent ?
(EnrichedEvent) event : null);
+ private static class EventWaitingForTransfer {
+
+ private final Long eventId;
+ private final WebSocketConnector connector;
+ private final Event event;
+
+ public EventWaitingForTransfer(Long eventId, WebSocketConnector connector,
Event event) {
+ this.eventId = eventId;
+ this.connector = connector;
+ this.event = event;
}
- handleStart(webSocket);
}
- private void handleError(WebSocket webSocket, String s) {
- long commitId = Long.parseLong(s.replace("ERROR:", ""));
- String log =
- String.format(
- "The tablet of commitId: %d can't be parsed by client, it will be
retried later.",
- commitId);
- LOGGER.warn(log);
- Event event = eventsWaitingForAck.remove(commitId);
- if (event != null) {
- eventsWaitingForTransfer.put(new Pair<>(commitId, event));
+ private static class EventWaitingForAck {
+
+ private final WebSocketConnector connector;
+ private final Event event;
+
+ public EventWaitingForAck(WebSocketConnector connector, Event event) {
+ this.connector = connector;
+ this.event = event;
}
- handleStart(webSocket);
}
}