This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new eee977c28b9 Subscription: fix null ByteBuffer when transferring
SerializedEnrichedEvent (#12273)
eee977c28b9 is described below
commit eee977c28b92d5157e460f848aa8e678bc804ff4
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Apr 1 19:38:22 2024 +0800
Subscription: fix null ByteBuffer when transferring SerializedEnrichedEvent
(#12273)
---
.github/workflows/pipe-it-2cluster.yml | 2 +-
.../payload/response/PipeSubscribePollResp.java | 24 +++++++++++++++++++---
.../broker/SerializedEnrichedEvent.java | 18 ++++++++++++----
.../receiver/SubscriptionReceiverV1.java | 23 +++++++++------------
4 files changed, 46 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/pipe-it-2cluster.yml
b/.github/workflows/pipe-it-2cluster.yml
index 040154f8cb9..c39bfae159b 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -109,7 +109,7 @@ jobs:
java: [ 17 ]
# StrongConsistencyClusterMode is ignored now because RatisConsensus
has not been supported yet.
cluster1: [ LightWeightStandaloneMode, ScalableSingleNodeMode,
HighPerformanceMode ]
- cluster2: [ LightWeightStandaloneMode ]
+ cluster2: [ LightWeightStandaloneMode, ScalableSingleNodeMode,
HighPerformanceMode ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
steps:
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
index 2616a8316fd..0d1ed4eb0e2 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import java.io.DataOutputStream;
@@ -32,6 +33,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
public class PipeSubscribePollResp extends TPipeSubscribeResp {
@@ -48,16 +50,17 @@ public class PipeSubscribePollResp extends
TPipeSubscribeResp {
* server.
*/
public static PipeSubscribePollResp toTPipeSubscribeResp(
- TSStatus status, List<EnrichedTablets> enrichedTabletsList) {
+ TSStatus status, List<Pair<ByteBuffer, EnrichedTablets>>
enrichedTabletsWithByteBufferList) {
final PipeSubscribePollResp resp = new PipeSubscribePollResp();
- resp.enrichedTabletsList = enrichedTabletsList;
+ resp.enrichedTabletsList =
+
enrichedTabletsWithByteBufferList.stream().map(Pair::getRight).collect(Collectors.toList());
resp.status = status;
resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
resp.type = PipeSubscribeResponseType.POLL_TABLETS.getType();
try {
- resp.body = serializeEnrichedTabletsList(enrichedTabletsList);
+ resp.body =
serializeEnrichedTabletsWithByteBufferList(enrichedTabletsWithByteBufferList);
} catch (IOException e) {
resp.status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR,
e.getMessage());
}
@@ -108,6 +111,21 @@ public class PipeSubscribePollResp extends
TPipeSubscribeResp {
return byteBufferList;
}
+ public static List<ByteBuffer> serializeEnrichedTabletsWithByteBufferList(
+ List<Pair<ByteBuffer, EnrichedTablets>>
enrichedTabletsWithByteBufferList)
+ throws IOException {
+ List<ByteBuffer> byteBufferList = new ArrayList<>();
+ for (Pair<ByteBuffer, EnrichedTablets> enrichedTabletsWithByteBuffer :
+ enrichedTabletsWithByteBufferList) {
+ if (Objects.nonNull(enrichedTabletsWithByteBuffer.getLeft())) {
+ byteBufferList.add(enrichedTabletsWithByteBuffer.getLeft());
+ } else {
+
byteBufferList.add(serializeEnrichedTablets(enrichedTabletsWithByteBuffer.getRight()));
+ }
+ }
+ return byteBufferList;
+ }
+
public static ByteBuffer serializeEnrichedTablets(EnrichedTablets
enrichedTablets)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
index 4b3808f1a09..419579713fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
@@ -54,6 +54,12 @@ public class SerializedEnrichedEvent {
this.committedTimestamp = INVALID_TIMESTAMP;
}
+ //////////////////////////// serialization ////////////////////////////
+
+ public EnrichedTablets getEnrichedTablets() {
+ return enrichedTablets;
+ }
+
/** @return true -> byte buffer is not null */
public boolean serialize() {
if (Objects.isNull(byteBuffer)) {
@@ -80,6 +86,8 @@ public class SerializedEnrichedEvent {
byteBuffer = null;
}
+ //////////////////////////// commit ////////////////////////////
+
public String getSubscriptionCommitId() {
return enrichedTablets.getSubscriptionCommitId();
}
@@ -90,10 +98,6 @@ public class SerializedEnrichedEvent {
}
}
- public void recordLastPolledTimestamp() {
- lastPolledTimestamp = Math.max(lastPolledTimestamp,
System.currentTimeMillis());
- }
-
public void recordCommittedTimestamp() {
committedTimestamp = System.currentTimeMillis();
}
@@ -102,6 +106,12 @@ public class SerializedEnrichedEvent {
return committedTimestamp != INVALID_TIMESTAMP;
}
+ //////////////////////////// pollable ////////////////////////////
+
+ public void recordLastPolledTimestamp() {
+ lastPolledTimestamp = Math.max(lastPolledTimestamp,
System.currentTimeMillis());
+ }
+
public boolean pollable() {
if (lastPolledTimestamp == INVALID_TIMESTAMP) {
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 22a3b8ee1a2..d0e35211816 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
@@ -59,6 +60,7 @@ import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeSubscribe
import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -325,14 +327,6 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
SubscriptionConfig.getInstance().getSubscriptionMinPollTimeoutMs()));
List<SerializedEnrichedEvent> events =
SubscriptionAgent.broker().poll(consumerConfig, topicNames, timer);
-
- // serialize events and filter
- events =
- events.stream()
- .peek((SerializedEnrichedEvent::serialize))
- .filter((event -> Objects.nonNull(event.getByteBuffer())))
- .collect(Collectors.toList());
-
List<String> subscriptionCommitIds =
events.stream()
.map(SerializedEnrichedEvent::getSubscriptionCommitId)
@@ -344,18 +338,21 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
consumerConfig,
topicNames);
}
-
LOGGER.info(
"Subscription: consumer {} poll topics {} successfully, commit ids:
{}",
consumerConfig,
topicNames,
subscriptionCommitIds);
- // fetch and reset byte buffer
- List<ByteBuffer> byteBuffers =
-
events.stream().map(SerializedEnrichedEvent::getByteBuffer).collect(Collectors.toList());
+ List<Pair<ByteBuffer, EnrichedTablets>> enrichedTabletsWithByteBufferList =
+ events.stream()
+ .map(event -> new Pair<>(event.getByteBuffer(),
event.getEnrichedTablets()))
+ .collect(Collectors.toList());
+ TPipeSubscribeResp resp =
+ PipeSubscribePollResp.toTPipeSubscribeResp(
+ RpcUtils.SUCCESS_STATUS, enrichedTabletsWithByteBufferList);
events.forEach(SerializedEnrichedEvent::resetByteBuffer);
- return
PipeSubscribePollResp.directToTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS,
byteBuffers);
+ return resp;
}
private TPipeSubscribeResp handlePipeSubscribeCommit(PipeSubscribeCommitReq
req) {