This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-wal-resource-management in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 31fa754dcb66f32edc9a23e2bb4b4c1d7888d1eb Author: Steve Yurong Su <[email protected]> AuthorDate: Tue May 23 17:24:02 2023 +0800 reduce heartbeat frequency --- .../apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java index 742b2230fa6..df0ea7d1fcd 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java @@ -39,6 +39,9 @@ public class PipeConnectorSubtask extends PipeSubtask { private final ListenableBlockingPendingQueue<Event> inputPendingQueue; private final PipeConnector outputPipeConnector; + private static final int HEARTBEAT_CHECK_INTERVAL = 1000; + private int executeOnceInvokedTimes; + /** @param taskID connectorAttributeSortedString */ public PipeConnectorSubtask( String taskID, @@ -47,13 +50,15 @@ public class PipeConnectorSubtask extends PipeSubtask { super(taskID); this.inputPendingQueue = inputPendingQueue; this.outputPipeConnector = outputPipeConnector; + executeOnceInvokedTimes = 0; } @Override protected synchronized boolean executeOnce() { try { - // TODO: reduce the frequency of heartbeat - outputPipeConnector.heartbeat(); + if (executeOnceInvokedTimes++ % HEARTBEAT_CHECK_INTERVAL == 0) { + outputPipeConnector.heartbeat(); + } } catch (Exception e) { throw new PipeConnectionException( "PipeConnector: failed to connect to the target system.", e);
