This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new eee977c28b9 Subscription: fix null ByteBuffer when transferring 
SerializedEnrichedEvent (#12273)
eee977c28b9 is described below

commit eee977c28b92d5157e460f848aa8e678bc804ff4
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Apr 1 19:38:22 2024 +0800

    Subscription: fix null ByteBuffer when transferring SerializedEnrichedEvent 
(#12273)
---
 .github/workflows/pipe-it-2cluster.yml             |  2 +-
 .../payload/response/PipeSubscribePollResp.java    | 24 +++++++++++++++++++---
 .../broker/SerializedEnrichedEvent.java            | 18 ++++++++++++----
 .../receiver/SubscriptionReceiverV1.java           | 23 +++++++++------------
 4 files changed, 46 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/pipe-it-2cluster.yml 
b/.github/workflows/pipe-it-2cluster.yml
index 040154f8cb9..c39bfae159b 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -109,7 +109,7 @@ jobs:
         java: [ 17 ]
         # StrongConsistencyClusterMode is ignored now because RatisConsensus 
has not been supported yet.
         cluster1: [ LightWeightStandaloneMode, ScalableSingleNodeMode, 
HighPerformanceMode ]
-        cluster2: [ LightWeightStandaloneMode ]
+        cluster2: [ LightWeightStandaloneMode, ScalableSingleNodeMode, 
HighPerformanceMode ]
         os: [ ubuntu-latest ]
     runs-on: ${{ matrix.os }}
     steps:
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
index 2616a8316fd..0d1ed4eb0e2 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.DataOutputStream;
@@ -32,6 +33,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class PipeSubscribePollResp extends TPipeSubscribeResp {
 
@@ -48,16 +50,17 @@ public class PipeSubscribePollResp extends 
TPipeSubscribeResp {
    * server.
    */
   public static PipeSubscribePollResp toTPipeSubscribeResp(
-      TSStatus status, List<EnrichedTablets> enrichedTabletsList) {
+      TSStatus status, List<Pair<ByteBuffer, EnrichedTablets>> 
enrichedTabletsWithByteBufferList) {
     final PipeSubscribePollResp resp = new PipeSubscribePollResp();
 
-    resp.enrichedTabletsList = enrichedTabletsList;
+    resp.enrichedTabletsList =
+        
enrichedTabletsWithByteBufferList.stream().map(Pair::getRight).collect(Collectors.toList());
 
     resp.status = status;
     resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
     resp.type = PipeSubscribeResponseType.POLL_TABLETS.getType();
     try {
-      resp.body = serializeEnrichedTabletsList(enrichedTabletsList);
+      resp.body = 
serializeEnrichedTabletsWithByteBufferList(enrichedTabletsWithByteBufferList);
     } catch (IOException e) {
       resp.status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, 
e.getMessage());
     }
@@ -108,6 +111,21 @@ public class PipeSubscribePollResp extends 
TPipeSubscribeResp {
     return byteBufferList;
   }
 
+  public static List<ByteBuffer> serializeEnrichedTabletsWithByteBufferList(
+      List<Pair<ByteBuffer, EnrichedTablets>> 
enrichedTabletsWithByteBufferList)
+      throws IOException {
+    List<ByteBuffer> byteBufferList = new ArrayList<>();
+    for (Pair<ByteBuffer, EnrichedTablets> enrichedTabletsWithByteBuffer :
+        enrichedTabletsWithByteBufferList) {
+      if (Objects.nonNull(enrichedTabletsWithByteBuffer.getLeft())) {
+        byteBufferList.add(enrichedTabletsWithByteBuffer.getLeft());
+      } else {
+        
byteBufferList.add(serializeEnrichedTablets(enrichedTabletsWithByteBuffer.getRight()));
+      }
+    }
+    return byteBufferList;
+  }
+
   public static ByteBuffer serializeEnrichedTablets(EnrichedTablets 
enrichedTablets)
       throws IOException {
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
index 4b3808f1a09..419579713fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
@@ -54,6 +54,12 @@ public class SerializedEnrichedEvent {
     this.committedTimestamp = INVALID_TIMESTAMP;
   }
 
+  //////////////////////////// serialization ////////////////////////////
+
+  public EnrichedTablets getEnrichedTablets() {
+    return enrichedTablets;
+  }
+
   /** @return true -> byte buffer is not null */
   public boolean serialize() {
     if (Objects.isNull(byteBuffer)) {
@@ -80,6 +86,8 @@ public class SerializedEnrichedEvent {
     byteBuffer = null;
   }
 
+  //////////////////////////// commit ////////////////////////////
+
   public String getSubscriptionCommitId() {
     return enrichedTablets.getSubscriptionCommitId();
   }
@@ -90,10 +98,6 @@ public class SerializedEnrichedEvent {
     }
   }
 
-  public void recordLastPolledTimestamp() {
-    lastPolledTimestamp = Math.max(lastPolledTimestamp, 
System.currentTimeMillis());
-  }
-
   public void recordCommittedTimestamp() {
     committedTimestamp = System.currentTimeMillis();
   }
@@ -102,6 +106,12 @@ public class SerializedEnrichedEvent {
     return committedTimestamp != INVALID_TIMESTAMP;
   }
 
+  //////////////////////////// pollable ////////////////////////////
+
+  public void recordLastPolledTimestamp() {
+    lastPolledTimestamp = Math.max(lastPolledTimestamp, 
System.currentTimeMillis());
+  }
+
   public boolean pollable() {
     if (lastPolledTimestamp == INVALID_TIMESTAMP) {
       return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 22a3b8ee1a2..d0e35211816 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
 import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
 import 
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
 import 
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
@@ -59,6 +60,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribe
 import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -325,14 +327,6 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
                     
SubscriptionConfig.getInstance().getSubscriptionMinPollTimeoutMs()));
     List<SerializedEnrichedEvent> events =
         SubscriptionAgent.broker().poll(consumerConfig, topicNames, timer);
-
-    // serialize events and filter
-    events =
-        events.stream()
-            .peek((SerializedEnrichedEvent::serialize))
-            .filter((event -> Objects.nonNull(event.getByteBuffer())))
-            .collect(Collectors.toList());
-
     List<String> subscriptionCommitIds =
         events.stream()
             .map(SerializedEnrichedEvent::getSubscriptionCommitId)
@@ -344,18 +338,21 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
           consumerConfig,
           topicNames);
     }
-
     LOGGER.info(
         "Subscription: consumer {} poll topics {} successfully, commit ids: 
{}",
         consumerConfig,
         topicNames,
         subscriptionCommitIds);
 
-    // fetch and reset byte buffer
-    List<ByteBuffer> byteBuffers =
-        
events.stream().map(SerializedEnrichedEvent::getByteBuffer).collect(Collectors.toList());
+    List<Pair<ByteBuffer, EnrichedTablets>> enrichedTabletsWithByteBufferList =
+        events.stream()
+            .map(event -> new Pair<>(event.getByteBuffer(), 
event.getEnrichedTablets()))
+            .collect(Collectors.toList());
+    TPipeSubscribeResp resp =
+        PipeSubscribePollResp.toTPipeSubscribeResp(
+            RpcUtils.SUCCESS_STATUS, enrichedTabletsWithByteBufferList);
     events.forEach(SerializedEnrichedEvent::resetByteBuffer);
-    return 
PipeSubscribePollResp.directToTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, 
byteBuffers);
+    return resp;
   }
 
   private TPipeSubscribeResp handlePipeSubscribeCommit(PipeSubscribeCommitReq 
req) {

Reply via email to