This is an automated email from the ASF dual-hosted git repository.

VGalaxies pushed a commit to branch subscription-topic-owner-fencing
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 76f2a882411ba790f7db290740f0232a1e5c4022
Author: OpenAI Codex <[email protected]>
AuthorDate: Thu May 28 05:53:30 2026 +0000

    Subscription: add topic owner epoch fencing
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   4 +
 .../rpc/subscription/config/ConsumerConfig.java    |  10 ++
 .../rpc/subscription/config/ConsumerConstant.java  |   2 +
 .../rpc/subscription/config/TopicConstant.java     |   4 +
 .../base/AbstractSubscriptionConsumer.java         |  22 +++
 .../base/AbstractSubscriptionConsumerBuilder.java  |  23 +++
 .../base/AbstractSubscriptionProvider.java         |  12 ++
 .../consumer/table/SubscriptionTableProvider.java  |   4 +
 .../table/SubscriptionTablePullConsumer.java       |   4 +
 .../SubscriptionTablePullConsumerBuilder.java      |  18 +++
 .../table/SubscriptionTablePushConsumer.java       |   4 +
 .../SubscriptionTablePushConsumerBuilder.java      |  18 +++
 .../consumer/tree/SubscriptionTreeProvider.java    |   4 +
 .../tree/SubscriptionTreePullConsumer.java         |  21 +++
 .../tree/SubscriptionTreePullConsumerBuilder.java  |  18 +++
 .../tree/SubscriptionTreePushConsumer.java         |  21 +++
 .../tree/SubscriptionTreePushConsumerBuilder.java  |  18 +++
 .../subscription/agent/SubscriptionTopicAgent.java |  74 ++++++++++
 .../receiver/SubscriptionReceiverV1.java           |  57 ++++++++
 .../receiver/SubscriptionReceiverV1Test.java       |  88 ++++++++++++
 .../commons/subscription/meta/topic/TopicMeta.java | 160 ++++++++++++++++++++-
 .../commons/subscription/topic/TopicDeSerTest.java |  46 ++++++
 22 files changed, 630 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index e5014681fa7..01fde3cb8df 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -316,6 +316,10 @@ public enum TSStatusCode {
   SHOW_SUBSCRIPTION_ERROR(1910),
   SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911),
   SUBSCRIPTION_NOT_ENABLED_ERROR(1912),
+  SUBSCRIPTION_OWNER_FENCED(1913),
+  SUBSCRIPTION_OWNER_REQUIRED(1914),
+  SUBSCRIPTION_OWNER_EPOCH_REQUIRED(1915),
+  SUBSCRIPTION_OWNER_LEASE_EXPIRED(1916),
 
   // Topic
   CREATE_TOPIC_ERROR(2000),
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
index 3cb0087d682..13f2a9ee3fb 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java
@@ -68,6 +68,16 @@ public class ConsumerConfig extends PipeParameters {
     return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY);
   }
 
+  public String getOwnerId() {
+    return getString(ConsumerConstant.OWNER_ID_KEY);
+  }
+
+  public Long getOwnerEpoch() {
+    return hasAttribute(ConsumerConstant.OWNER_EPOCH_KEY)
+        ? getLong(ConsumerConstant.OWNER_EPOCH_KEY)
+        : null;
+  }
+
   public String getUsername() {
     return getString(ConsumerConstant.USERNAME_KEY);
   }
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 90d2ea7a01f..3df95facf36 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -40,6 +40,8 @@ public class ConsumerConstant {
 
   public static final String CONSUMER_ID_KEY = "consumer-id";
   public static final String CONSUMER_GROUP_ID_KEY = "group-id";
+  public static final String OWNER_ID_KEY = "owner-id";
+  public static final String OWNER_EPOCH_KEY = "owner-epoch";
 
   public static final String HEARTBEAT_INTERVAL_MS_KEY = 
"heartbeat-interval-ms";
   public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L;
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index 52c8e4de752..73036684ccd 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -61,6 +61,10 @@ public class TopicConstant {
   public static final String STRICT_KEY = "strict";
   public static final String STRICT_DEFAULT_VALUE = "true";
 
+  public static final String OWNER_ID_KEY = "owner-id";
+  public static final String OWNER_EPOCH_KEY = "owner-epoch";
+  public static final String OWNER_LEASE_EXPIRE_TIME_MS_KEY = 
"owner-lease-expire-time-ms";
+
   private TopicConstant() {
     throw new IllegalStateException(SubscriptionMessages.UTILITY_CLASS);
   }
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index 63e5a263f22..57c78dbb4d0 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -105,6 +105,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
   protected String consumerId;
   protected String consumerGroupId;
+  protected String ownerId;
+  protected Long ownerEpoch;
 
   private final long heartbeatIntervalMs;
   private final long endpointsSyncIntervalMs;
@@ -154,6 +156,14 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     return consumerGroupId;
   }
 
+  public String getOwnerId() {
+    return ownerId;
+  }
+
+  public Long getOwnerEpoch() {
+    return ownerEpoch;
+  }
+
   /////////////////////////////// ctor ///////////////////////////////
 
   protected AbstractSubscriptionConsumer(final 
AbstractSubscriptionConsumerBuilder builder) {
@@ -183,6 +193,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
 
     this.consumerId = builder.consumerId;
     this.consumerGroupId = builder.consumerGroupId;
+    this.ownerId = builder.ownerId;
+    this.ownerEpoch = builder.ownerEpoch;
 
     this.heartbeatIntervalMs = builder.heartbeatIntervalMs;
     this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
@@ -213,6 +225,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
             .encryptedPassword((String) 
properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY))
             .consumerId((String) 
properties.get(ConsumerConstant.CONSUMER_ID_KEY))
             .consumerGroupId((String) 
properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
+            .ownerId((String) properties.get(ConsumerConstant.OWNER_ID_KEY))
+            .ownerEpoch((Long) 
properties.get(ConsumerConstant.OWNER_EPOCH_KEY))
             .heartbeatIntervalMs(
                 (Long)
                     properties.getOrDefault(
@@ -394,6 +408,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs);
@@ -408,6 +424,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
             this.encryptedPassword,
             this.consumerId,
             this.consumerGroupId,
+            this.ownerId,
+            this.ownerEpoch,
             this.thriftMaxFrameSize,
             this.heartbeatIntervalMs,
             this.connectionTimeoutInMs);
@@ -1429,6 +1447,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     final Map<String, String> result = new HashMap<>();
     result.put("consumerId", consumerId);
     result.put("consumerGroupId", consumerGroupId);
+    result.put("ownerId", ownerId);
+    result.put("ownerEpoch", String.valueOf(ownerEpoch));
     result.put("isClosed", isClosed.toString());
     result.put("fileSaveDir", fileSaveDir);
     result.put(
@@ -1443,6 +1463,8 @@ abstract class AbstractSubscriptionConsumer implements 
AutoCloseable {
     final Map<String, String> result = new HashMap<>();
     result.put("consumerId", consumerId);
     result.put("consumerGroupId", consumerGroupId);
+    result.put("ownerId", ownerId);
+    result.put("ownerEpoch", String.valueOf(ownerEpoch));
     result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs));
     result.put("endpointsSyncIntervalMs", 
String.valueOf(endpointsSyncIntervalMs));
     result.put("providers", providers.toString());
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
index 991d237ed2b..a0c4b421ed1 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java
@@ -40,6 +40,8 @@ public class AbstractSubscriptionConsumerBuilder {
 
   protected String consumerId;
   protected String consumerGroupId;
+  protected String ownerId;
+  protected Long ownerEpoch;
 
   protected long heartbeatIntervalMs = 
ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE;
   protected long endpointsSyncIntervalMs =
@@ -111,6 +113,27 @@ public class AbstractSubscriptionConsumerBuilder {
     return this;
   }
 
+  public AbstractSubscriptionConsumerBuilder ownerId(@Nullable final String 
ownerId) {
+    if (Objects.isNull(ownerId)) {
+      return this;
+    }
+    this.ownerId = ownerId;
+    return this;
+  }
+
+  public AbstractSubscriptionConsumerBuilder ownerEpoch(final long ownerEpoch) 
{
+    this.ownerEpoch = ownerEpoch;
+    return this;
+  }
+
+  public AbstractSubscriptionConsumerBuilder ownerEpoch(@Nullable final Long 
ownerEpoch) {
+    if (Objects.isNull(ownerEpoch)) {
+      return this;
+    }
+    this.ownerEpoch = ownerEpoch;
+    return this;
+  }
+
   public AbstractSubscriptionConsumerBuilder heartbeatIntervalMs(final long 
heartbeatIntervalMs) {
     this.heartbeatIntervalMs =
         Math.max(heartbeatIntervalMs, 
ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 413c609abbf..7225f6f8755 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -81,6 +81,8 @@ public abstract class AbstractSubscriptionProvider {
 
   private String consumerId;
   private String consumerGroupId;
+  private final String ownerId;
+  private final Long ownerEpoch;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(true);
   private final AtomicBoolean isAvailable = new AtomicBoolean(false);
@@ -109,6 +111,8 @@ public abstract class AbstractSubscriptionProvider {
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -124,6 +128,8 @@ public abstract class AbstractSubscriptionProvider {
     this.endPoint = endPoint;
     this.consumerId = consumerId;
     this.consumerGroupId = consumerGroupId;
+    this.ownerId = ownerId;
+    this.ownerEpoch = ownerEpoch;
     this.username = username;
     this.password = password;
     this.encryptedPassword = encryptedPassword;
@@ -176,6 +182,12 @@ public abstract class AbstractSubscriptionProvider {
     final Map<String, String> consumerAttributes = new HashMap<>();
     consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, 
consumerGroupId);
     consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
+    if (ownerId != null) {
+      consumerAttributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId);
+    }
+    if (ownerEpoch != null) {
+      consumerAttributes.put(ConsumerConstant.OWNER_EPOCH_KEY, 
String.valueOf(ownerEpoch));
+    }
     consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username);
     consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password);
     if (encryptedPassword != null) {
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
index 84470d283c2..ff67e3e532c 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java
@@ -33,6 +33,8 @@ final class SubscriptionTableProvider extends 
AbstractSubscriptionProvider {
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -43,6 +45,8 @@ final class SubscriptionTableProvider extends 
AbstractSubscriptionProvider {
         encryptedPassword,
         consumerId,
         consumerGroupId,
+        ownerId,
+        ownerEpoch,
         thriftMaxFrameSize,
         heartbeatIntervalMs,
         connectionTimeoutInMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
index 8f712782fb5..4c390c96420 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
@@ -45,6 +45,8 @@ public class SubscriptionTablePullConsumer extends 
AbstractSubscriptionPullConsu
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -55,6 +57,8 @@ public class SubscriptionTablePullConsumer extends 
AbstractSubscriptionPullConsu
         encryptedPassword,
         consumerId,
         consumerGroupId,
+        ownerId,
+        ownerEpoch,
         thriftMaxFrameSize,
         heartbeatIntervalMs,
         connectionTimeoutInMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
index 939228a7f49..efc99debb49 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java
@@ -74,6 +74,24 @@ public class SubscriptionTablePullConsumerBuilder extends 
AbstractSubscriptionPu
     return this;
   }
 
+  @Override
+  public SubscriptionTablePullConsumerBuilder ownerId(final String ownerId) {
+    super.ownerId(ownerId);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTablePullConsumerBuilder ownerEpoch(final long 
ownerEpoch) {
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTablePullConsumerBuilder ownerEpoch(final Long 
ownerEpoch) {
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
   @Override
   public SubscriptionTablePullConsumerBuilder heartbeatIntervalMs(final long 
heartbeatIntervalMs) {
     super.heartbeatIntervalMs(heartbeatIntervalMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
index e90afc1d8d1..4c85993a933 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java
@@ -41,6 +41,8 @@ public class SubscriptionTablePushConsumer extends 
AbstractSubscriptionPushConsu
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -51,6 +53,8 @@ public class SubscriptionTablePushConsumer extends 
AbstractSubscriptionPushConsu
         encryptedPassword,
         consumerId,
         consumerGroupId,
+        ownerId,
+        ownerEpoch,
         thriftMaxFrameSize,
         heartbeatIntervalMs,
         connectionTimeoutInMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
index 27bf328fea9..143b34056af 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java
@@ -76,6 +76,24 @@ public class SubscriptionTablePushConsumerBuilder extends 
AbstractSubscriptionPu
     return this;
   }
 
+  @Override
+  public SubscriptionTablePushConsumerBuilder ownerId(final String ownerId) {
+    super.ownerId(ownerId);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTablePushConsumerBuilder ownerEpoch(final long 
ownerEpoch) {
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTablePushConsumerBuilder ownerEpoch(final Long 
ownerEpoch) {
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
   @Override
   public SubscriptionTablePushConsumerBuilder heartbeatIntervalMs(final long 
heartbeatIntervalMs) {
     super.heartbeatIntervalMs(heartbeatIntervalMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
index 3589fbbcf74..8720c577d4d 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java
@@ -33,6 +33,8 @@ final class SubscriptionTreeProvider extends 
AbstractSubscriptionProvider {
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -43,6 +45,8 @@ final class SubscriptionTreeProvider extends 
AbstractSubscriptionProvider {
         encryptedPassword,
         consumerId,
         consumerGroupId,
+        ownerId,
+        ownerEpoch,
         thriftMaxFrameSize,
         heartbeatIntervalMs,
         connectionTimeoutInMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
index 7225036aaa4..d4d61513062 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
@@ -52,6 +52,8 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -62,6 +64,8 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
         encryptedPassword,
         consumerId,
         consumerGroupId,
+        ownerId,
+        ownerEpoch,
         thriftMaxFrameSize,
         heartbeatIntervalMs,
         connectionTimeoutInMs);
@@ -85,6 +89,8 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
             .encryptedPassword(builder.encryptedPassword)
             .consumerId(builder.consumerId)
             .consumerGroupId(builder.consumerGroupId)
+            .ownerId(builder.ownerId)
+            .ownerEpoch(builder.ownerEpoch)
             .heartbeatIntervalMs(builder.heartbeatIntervalMs)
             .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs)
             .fileSaveDir(builder.fileSaveDir)
@@ -238,6 +244,8 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
 
     private String consumerId;
     private String consumerGroupId;
+    private String ownerId;
+    private Long ownerEpoch;
 
     private long heartbeatIntervalMs = 
ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE;
     private long endpointsSyncIntervalMs =
@@ -299,6 +307,19 @@ public class SubscriptionTreePullConsumer extends 
AbstractSubscriptionPullConsum
       return this;
     }
 
+    public Builder ownerId(@Nullable final String ownerId) {
+      if (Objects.isNull(ownerId)) {
+        return this;
+      }
+      this.ownerId = ownerId;
+      return this;
+    }
+
+    public Builder ownerEpoch(final long ownerEpoch) {
+      this.ownerEpoch = ownerEpoch;
+      return this;
+    }
+
     public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) {
       this.heartbeatIntervalMs =
           Math.max(heartbeatIntervalMs, 
ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
index cbceb95d77f..2d057b7bfbd 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java
@@ -74,6 +74,24 @@ public class SubscriptionTreePullConsumerBuilder extends 
AbstractSubscriptionPul
     return this;
   }
 
+  @Override
+  public SubscriptionTreePullConsumerBuilder ownerId(final String ownerId) {
+    super.ownerId(ownerId);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTreePullConsumerBuilder ownerEpoch(final long ownerEpoch) 
{
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTreePullConsumerBuilder ownerEpoch(final Long ownerEpoch) 
{
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
   @Override
   public SubscriptionTreePullConsumerBuilder heartbeatIntervalMs(final long 
heartbeatIntervalMs) {
     super.heartbeatIntervalMs(heartbeatIntervalMs);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
index 4d8a5ef3e16..8fbc33d14ce 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java
@@ -51,6 +51,8 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
       final String encryptedPassword,
       final String consumerId,
       final String consumerGroupId,
+      final String ownerId,
+      final Long ownerEpoch,
       final int thriftMaxFrameSize,
       final long heartbeatIntervalMs,
       final int connectionTimeoutInMs) {
@@ -61,6 +63,8 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
         encryptedPassword,
         consumerId,
         consumerGroupId,
+        ownerId,
+        ownerEpoch,
         thriftMaxFrameSize,
         heartbeatIntervalMs,
         connectionTimeoutInMs);
@@ -84,6 +88,8 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
             .encryptedPassword(builder.encryptedPassword)
             .consumerId(builder.consumerId)
             .consumerGroupId(builder.consumerGroupId)
+            .ownerId(builder.ownerId)
+            .ownerEpoch(builder.ownerEpoch)
             .heartbeatIntervalMs(builder.heartbeatIntervalMs)
             .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs)
             .fileSaveDir(builder.fileSaveDir)
@@ -192,6 +198,8 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
 
     private String consumerId;
     private String consumerGroupId;
+    private String ownerId;
+    private Long ownerEpoch;
 
     private long heartbeatIntervalMs = 
ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE;
     private long endpointsSyncIntervalMs =
@@ -256,6 +264,19 @@ public class SubscriptionTreePushConsumer extends 
AbstractSubscriptionPushConsum
       return this;
     }
 
+    public Builder ownerId(@Nullable final String ownerId) {
+      if (Objects.isNull(ownerId)) {
+        return this;
+      }
+      this.ownerId = ownerId;
+      return this;
+    }
+
+    public Builder ownerEpoch(final long ownerEpoch) {
+      this.ownerEpoch = ownerEpoch;
+      return this;
+    }
+
     public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) {
       this.heartbeatIntervalMs =
           Math.max(heartbeatIntervalMs, 
ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE);
diff --git 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
index 86594433e77..9067f3e7ae5 100644
--- 
a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
+++ 
b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java
@@ -76,6 +76,24 @@ public class SubscriptionTreePushConsumerBuilder extends 
AbstractSubscriptionPus
     return this;
   }
 
+  @Override
+  public SubscriptionTreePushConsumerBuilder ownerId(final String ownerId) {
+    super.ownerId(ownerId);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTreePushConsumerBuilder ownerEpoch(final long ownerEpoch) 
{
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
+  @Override
+  public SubscriptionTreePushConsumerBuilder ownerEpoch(final Long ownerEpoch) 
{
+    super.ownerEpoch(ownerEpoch);
+    return this;
+  }
+
   @Override
   public SubscriptionTreePushConsumerBuilder heartbeatIntervalMs(final long 
heartbeatIntervalMs) {
     super.heartbeatIntervalMs(heartbeatIntervalMs);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index 9b629ab9c8d..cf0c168e7f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -19,10 +19,14 @@
 
 package org.apache.iotdb.db.subscription.agent;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMetaKeeper;
 import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
 import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 
@@ -31,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -188,4 +193,73 @@ public class SubscriptionTopicAgent {
       releaseReadLock();
     }
   }
+
+  public TSStatus checkTopicOwner(final ConsumerConfig consumerConfig, final 
String topicName) {
+    acquireReadLock();
+    try {
+      if (!topicMetaKeeper.containsTopicMeta(topicName)) {
+        return RpcUtils.SUCCESS_STATUS;
+      }
+
+      final TopicMeta topicMeta = topicMetaKeeper.getTopicMeta(topicName);
+      if (!topicMeta.isOwnerFencingEnabled()) {
+        return RpcUtils.SUCCESS_STATUS;
+      }
+
+      final String requestOwnerId = consumerConfig.getOwnerId();
+      if (Objects.isNull(requestOwnerId)) {
+        return RpcUtils.getStatus(
+            TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED,
+            String.format(
+                "Subscription: topic %s enables owner fencing, but consumer %s 
does not carry owner-id.",
+                topicName, consumerConfig));
+      }
+
+      final Long requestOwnerEpoch = consumerConfig.getOwnerEpoch();
+      if (Objects.isNull(requestOwnerEpoch)) {
+        return RpcUtils.getStatus(
+            TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_REQUIRED,
+            String.format(
+                "Subscription: topic %s enables owner fencing, but consumer %s 
does not carry owner-epoch.",
+                topicName, consumerConfig));
+      }
+
+      if (Objects.nonNull(topicMeta.getOwnerLeaseExpireTimeMs())
+          && System.currentTimeMillis() > 
topicMeta.getOwnerLeaseExpireTimeMs()) {
+        return RpcUtils.getStatus(
+            TSStatusCode.SUBSCRIPTION_OWNER_LEASE_EXPIRED,
+            String.format(
+                "Subscription: owner lease for topic %s has expired, owner-id: 
%s, owner-epoch: %s.",
+                topicName, topicMeta.getOwnerId(), topicMeta.getOwnerEpoch()));
+      }
+
+      if (!topicMeta.matchesOwner(requestOwnerId, requestOwnerEpoch)) {
+        return RpcUtils.getStatus(
+            TSStatusCode.SUBSCRIPTION_OWNER_FENCED,
+            String.format(
+                "Subscription: consumer owner is fenced for topic %s, request 
owner-id: %s,"
+                    + " request owner-epoch: %s, current owner-id: %s, current 
owner-epoch: %s.",
+                topicName,
+                requestOwnerId,
+                requestOwnerEpoch,
+                topicMeta.getOwnerId(),
+                topicMeta.getOwnerEpoch()));
+      }
+
+      return RpcUtils.SUCCESS_STATUS;
+    } finally {
+      releaseReadLock();
+    }
+  }
+
+  public TSStatus checkTopicOwners(
+      final ConsumerConfig consumerConfig, final Iterable<String> topicNames) {
+    for (final String topicName : topicNames) {
+      final TSStatus status = checkTopicOwner(consumerConfig, topicName);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return status;
+      }
+    }
+    return RpcUtils.SUCCESS_STATUS;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 2c4accecf45..fe08ffc43ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -326,6 +326,16 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     }
 
     // TODO: do something
+    final TSStatus ownerStatus =
+        SubscriptionAgent.topic()
+            .checkTopicOwners(
+                consumerConfig,
+                SubscriptionAgent.consumer()
+                    .getTopicNamesSubscribedByConsumer(
+                        consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId()));
+    if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(ownerStatus);
+    }
 
     LOGGER.info(DataNodeMiscMessages.SUBSCRIPTION_CONSUMER_HEARTBEAT_SUCCESS, 
consumerConfig);
 
@@ -406,6 +416,11 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
     // subscribe topics
     final Set<String> topicNames = req.getTopicNames();
+    final TSStatus ownerStatus =
+        SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNames);
+    if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(ownerStatus);
+    }
     subscribe(consumerConfig, topicNames);
 
     LOGGER.info(
@@ -498,16 +513,48 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     if (SubscriptionPollRequestType.isValidatedRequestType(requestType)) {
       switch (SubscriptionPollRequestType.valueOf(requestType)) {
         case POLL:
+          final Set<String> pollTopicNames = ((PollPayload) 
request.getPayload()).getTopicNames();
+          final Set<String> subscribedTopicNames =
+              SubscriptionAgent.consumer()
+                  .getTopicNamesSubscribedByConsumer(
+                      consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId());
+          final Set<String> topicNamesToCheck = new HashSet<>(pollTopicNames);
+          topicNamesToCheck.removeIf(topicName -> 
!subscribedTopicNames.contains(topicName));
+          final TSStatus ownerStatus =
+              SubscriptionAgent.topic().checkTopicOwners(consumerConfig, 
topicNamesToCheck);
+          if (ownerStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            return PipeSubscribePollResp.toTPipeSubscribeResp(ownerStatus, 
Collections.emptyList());
+          }
           events =
               handlePipeSubscribePollRequest(
                   consumerConfig, (PollPayload) request.getPayload(), 
maxBytes);
           break;
         case POLL_FILE:
+          final TSStatus tsFileOwnerStatus =
+              SubscriptionAgent.topic()
+                  .checkTopicOwner(
+                      consumerConfig,
+                      ((PollFilePayload) 
request.getPayload()).getCommitContext().getTopicName());
+          if (tsFileOwnerStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            return PipeSubscribePollResp.toTPipeSubscribeResp(
+                tsFileOwnerStatus, Collections.emptyList());
+          }
           events =
               handlePipeSubscribePollTsFileRequest(
                   consumerConfig, (PollFilePayload) request.getPayload());
           break;
         case POLL_TABLETS:
+          final TSStatus tabletsOwnerStatus =
+              SubscriptionAgent.topic()
+                  .checkTopicOwner(
+                      consumerConfig,
+                      ((PollTabletsPayload) request.getPayload())
+                          .getCommitContext()
+                          .getTopicName());
+          if (tabletsOwnerStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            return PipeSubscribePollResp.toTPipeSubscribeResp(
+                tabletsOwnerStatus, Collections.emptyList());
+          }
           events =
               handlePipeSubscribePollTabletsRequest(
                   consumerConfig, (PollTabletsPayload) request.getPayload());
@@ -666,6 +713,16 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
 
     // commit (ack or nack)
     final List<SubscriptionCommitContext> commitContexts = 
req.getCommitContexts();
+    final TSStatus ownerStatus =
+        SubscriptionAgent.topic()
+            .checkTopicOwners(
+                consumerConfig,
+                commitContexts.stream()
+                    .map(SubscriptionCommitContext::getTopicName)
+                    .collect(Collectors.toSet()));
+    if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return PipeSubscribeCommitResp.toTPipeSubscribeResp(ownerStatus);
+    }
     final boolean nack = req.isNack();
     final List<SubscriptionCommitContext> successfulCommitContexts =
         SubscriptionAgent.broker().commit(consumerConfig, commitContexts, 
nack);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
index ba0070187e3..b570d58a10c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java
@@ -20,14 +20,19 @@
 package org.apache.iotdb.db.subscription.receiver;
 
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -82,6 +87,70 @@ public class SubscriptionReceiverV1Test {
         invokeCalculateConsumerInactivityTimeoutMs(receiver, 
createConsumerConfig(5_000L)));
   }
 
+  @Test
+  public void testTopicOwnerFencingStatus() {
+    final String topicName = "topic-" + UUID.randomUUID();
+
+    
SubscriptionAgent.topic().handleSingleTopicMetaChanges(createTopicMeta(topicName,
 "sn1", 7L));
+    try {
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          SubscriptionAgent.topic()
+              .checkTopicOwner(createConsumerConfig(1_000L, "sn1", 7L), 
topicName)
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(),
+          SubscriptionAgent.topic()
+              .checkTopicOwner(createConsumerConfig(1_000L, "sn2", 7L), 
topicName)
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED.getStatusCode(),
+          SubscriptionAgent.topic()
+              .checkTopicOwner(createConsumerConfig(1_000L), topicName)
+              .getCode());
+    } finally {
+      SubscriptionAgent.topic().handleDropTopic(topicName);
+    }
+  }
+
+  @Test
+  public void testOldOwnerFencedAfterNetworkPartitionAndTopicOwnerTransfer() {
+    final String topicName = "topic-" + UUID.randomUUID();
+    final TopicMeta topicMeta = createTopicMeta(topicName, "sn1", 5L);
+    final ConsumerConfig oldSnConsumer = createConsumerConfig(1_000L, "sn1", 
5L);
+    final ConsumerConfig newSnConsumer = createConsumerConfig(1_000L, "sn2", 
6L);
+
+    SubscriptionAgent.topic().handleSingleTopicMetaChanges(topicMeta);
+    try {
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, 
topicName).getCode());
+
+      final TopicMeta transferredTopicMeta = topicMeta.deepCopy();
+      transferredTopicMeta.transferOwner("sn2", 6L);
+      
SubscriptionAgent.topic().handleSingleTopicMetaChanges(transferredTopicMeta);
+
+      Assert.assertEquals(
+          TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(),
+          SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, 
topicName).getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(),
+          SubscriptionAgent.topic()
+              .checkTopicOwners(oldSnConsumer, 
Collections.singleton(topicName))
+              .getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          SubscriptionAgent.topic().checkTopicOwner(newSnConsumer, 
topicName).getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+          SubscriptionAgent.topic()
+              .checkTopicOwners(newSnConsumer, 
Collections.singleton(topicName))
+              .getCode());
+    } finally {
+      SubscriptionAgent.topic().handleDropTopic(topicName);
+    }
+  }
+
   private long invokeCalculateConsumerInactivityTimeoutMs(
       final SubscriptionReceiverV1 receiver, final ConsumerConfig 
consumerConfig) throws Exception {
     final Method method =
@@ -91,11 +160,30 @@ public class SubscriptionReceiverV1Test {
     return (long) method.invoke(receiver, consumerConfig);
   }
 
+  private TopicMeta createTopicMeta(
+      final String topicName, final String ownerId, final long ownerEpoch) {
+    final Map<String, String> topicAttributes = new HashMap<>();
+    topicAttributes.put(TopicConstant.OWNER_ID_KEY, ownerId);
+    topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, 
String.valueOf(ownerEpoch));
+    return new TopicMeta(topicName, 1, topicAttributes);
+  }
+
   private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) {
+    return createConsumerConfig(heartbeatIntervalMs, null, null);
+  }
+
+  private ConsumerConfig createConsumerConfig(
+      final long heartbeatIntervalMs, final String ownerId, final Long 
ownerEpoch) {
     final Map<String, String> attributes = new HashMap<>();
     attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" + 
UUID.randomUUID());
     attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" + 
UUID.randomUUID());
     attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, 
String.valueOf(heartbeatIntervalMs));
+    if (ownerId != null) {
+      attributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId);
+    }
+    if (ownerEpoch != null) {
+      attributes.put(ConsumerConstant.OWNER_EPOCH_KEY, 
String.valueOf(ownerEpoch));
+    }
     return new ConsumerConfig(attributes);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index badb77d6f48..6a6afc1221a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.rpc.subscription.config.TopicConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -45,11 +46,18 @@ public class TopicMeta {
   private long creationTime; // unit in ms
   private TopicConfig config;
 
+  private String ownerId;
+  private long ownerEpoch;
+  private long ownerLastTransferTimeMs;
+  private Long ownerLeaseExpireTimeMs;
+
   // TODO: remove this variable later
   private Set<String> subscribedConsumerGroupIds; // unused now
 
   private TopicMeta() {
     this.config = new TopicConfig(new HashMap<>());
+    this.ownerEpoch = -1L;
+    this.ownerLastTransferTimeMs = -1L;
 
     this.subscribedConsumerGroupIds = new HashSet<>();
   }
@@ -59,6 +67,9 @@ public class TopicMeta {
     this.topicName = topicName;
     this.creationTime = creationTime;
     this.config = new TopicConfig(topicAttributes);
+    this.ownerEpoch = -1L;
+    this.ownerLastTransferTimeMs = -1L;
+    initOwnerFromTopicAttributes(topicAttributes);
 
     this.subscribedConsumerGroupIds = new HashSet<>();
   }
@@ -68,6 +79,10 @@ public class TopicMeta {
     copied.topicName = topicName;
     copied.creationTime = creationTime;
     copied.config = new TopicConfig(new HashMap<>(config.getAttribute()));
+    copied.ownerId = ownerId;
+    copied.ownerEpoch = ownerEpoch;
+    copied.ownerLastTransferTimeMs = ownerLastTransferTimeMs;
+    copied.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs;
 
     copied.subscribedConsumerGroupIds = new 
HashSet<>(subscribedConsumerGroupIds);
     return copied;
@@ -85,6 +100,81 @@ public class TopicMeta {
     return config;
   }
 
+  public boolean isOwnerFencingEnabled() {
+    return Objects.nonNull(ownerId) && ownerEpoch >= 0;
+  }
+
+  public String getOwnerId() {
+    return ownerId;
+  }
+
+  public long getOwnerEpoch() {
+    return ownerEpoch;
+  }
+
+  public long getOwnerLastTransferTimeMs() {
+    return ownerLastTransferTimeMs;
+  }
+
+  public Long getOwnerLeaseExpireTimeMs() {
+    return ownerLeaseExpireTimeMs;
+  }
+
+  public void transferOwner(final String ownerId, final long ownerEpoch) {
+    transferOwner(ownerId, ownerEpoch, null);
+  }
+
+  public void transferOwner(
+      final String ownerId, final long ownerEpoch, final Long 
ownerLeaseExpireTimeMs) {
+    if (Objects.isNull(ownerId) || ownerId.isEmpty()) {
+      throw new IllegalArgumentException("Subscription topic owner id should 
not be empty");
+    }
+    if (ownerEpoch < 0) {
+      throw new IllegalArgumentException("Subscription topic owner epoch 
should not be negative");
+    }
+    if (isOwnerFencingEnabled() && ownerEpoch <= this.ownerEpoch) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Subscription topic owner epoch should increase monotonically, 
current epoch is %s,"
+                  + " incoming epoch is %s",
+              this.ownerEpoch, ownerEpoch));
+    }
+
+    this.ownerId = ownerId;
+    this.ownerEpoch = ownerEpoch;
+    this.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs;
+    this.ownerLastTransferTimeMs = System.currentTimeMillis();
+
+    config.getAttribute().put(TopicConstant.OWNER_ID_KEY, ownerId);
+    config.getAttribute().put(TopicConstant.OWNER_EPOCH_KEY, 
String.valueOf(ownerEpoch));
+    if (Objects.nonNull(ownerLeaseExpireTimeMs)) {
+      config
+          .getAttribute()
+          .put(
+              TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY, 
String.valueOf(ownerLeaseExpireTimeMs));
+    } else {
+      
config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY);
+    }
+  }
+
+  public void clearOwner() {
+    ownerId = null;
+    ownerEpoch = -1L;
+    ownerLastTransferTimeMs = -1L;
+    ownerLeaseExpireTimeMs = null;
+    config.getAttribute().remove(TopicConstant.OWNER_ID_KEY);
+    config.getAttribute().remove(TopicConstant.OWNER_EPOCH_KEY);
+    config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY);
+  }
+
+  public boolean matchesOwner(final String requestOwnerId, final Long 
requestOwnerEpoch) {
+    return !isOwnerFencingEnabled()
+        || (Objects.equals(ownerId, requestOwnerId)
+            && Objects.equals(ownerEpoch, requestOwnerEpoch)
+            && (Objects.isNull(ownerLeaseExpireTimeMs)
+                || System.currentTimeMillis() <= ownerLeaseExpireTimeMs));
+  }
+
   /**
    * @return true if the consumer group did not already subscribe this topic
    */
@@ -136,6 +226,17 @@ public class TopicMeta {
     for (final String subscribedConsumerGroupID : subscribedConsumerGroupIds) {
       ReadWriteIOUtils.write(subscribedConsumerGroupID, outputStream);
     }
+
+    ReadWriteIOUtils.write(isOwnerFencingEnabled(), outputStream);
+    if (isOwnerFencingEnabled()) {
+      ReadWriteIOUtils.write(ownerId, outputStream);
+      ReadWriteIOUtils.write(ownerEpoch, outputStream);
+      ReadWriteIOUtils.write(ownerLastTransferTimeMs, outputStream);
+      ReadWriteIOUtils.write(Objects.nonNull(ownerLeaseExpireTimeMs), 
outputStream);
+      if (Objects.nonNull(ownerLeaseExpireTimeMs)) {
+        ReadWriteIOUtils.write(ownerLeaseExpireTimeMs, outputStream);
+      }
+    }
   }
 
   public static TopicMeta deserialize(final InputStream inputStream) throws 
IOException {
@@ -156,6 +257,14 @@ public class TopicMeta {
       
topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(inputStream));
     }
 
+    if (inputStream.available() > 0 && ReadWriteIOUtils.readBool(inputStream)) 
{
+      topicMeta.ownerId = ReadWriteIOUtils.readString(inputStream);
+      topicMeta.ownerEpoch = ReadWriteIOUtils.readLong(inputStream);
+      topicMeta.ownerLastTransferTimeMs = 
ReadWriteIOUtils.readLong(inputStream);
+      topicMeta.ownerLeaseExpireTimeMs =
+          ReadWriteIOUtils.readBool(inputStream) ? 
ReadWriteIOUtils.readLong(inputStream) : null;
+    }
+
     return topicMeta;
   }
 
@@ -177,9 +286,37 @@ public class TopicMeta {
       
topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(byteBuffer));
     }
 
+    if (byteBuffer.hasRemaining() && ReadWriteIOUtils.readBool(byteBuffer)) {
+      topicMeta.ownerId = ReadWriteIOUtils.readString(byteBuffer);
+      topicMeta.ownerEpoch = ReadWriteIOUtils.readLong(byteBuffer);
+      topicMeta.ownerLastTransferTimeMs = 
ReadWriteIOUtils.readLong(byteBuffer);
+      topicMeta.ownerLeaseExpireTimeMs =
+          ReadWriteIOUtils.readBool(byteBuffer) ? 
ReadWriteIOUtils.readLong(byteBuffer) : null;
+    }
+
     return topicMeta;
   }
 
+  private void initOwnerFromTopicAttributes(final Map<String, String> 
topicAttributes) {
+    final TopicConfig topicConfig = new TopicConfig(topicAttributes);
+    final String configuredOwnerId = 
topicConfig.getString(TopicConstant.OWNER_ID_KEY);
+    if (Objects.isNull(configuredOwnerId)) {
+      return;
+    }
+
+    final Long configuredOwnerEpoch = 
topicConfig.getLong(TopicConstant.OWNER_EPOCH_KEY);
+    if (Objects.isNull(configuredOwnerEpoch)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Subscription topic owner epoch should be set when %s is set",
+              TopicConstant.OWNER_ID_KEY));
+    }
+    transferOwner(
+        configuredOwnerId,
+        configuredOwnerEpoch,
+        topicConfig.getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY));
+  }
+
   /////////////////////////////// utilities ///////////////////////////////
 
   public Map<String, String> generateExtractorAttributes(final String 
username) {
@@ -257,12 +394,23 @@ public class TopicMeta {
     final TopicMeta that = (TopicMeta) obj;
     return creationTime == that.creationTime
         && Objects.equals(topicName, that.topicName)
-        && Objects.equals(config, that.config);
+        && Objects.equals(config, that.config)
+        && Objects.equals(ownerId, that.ownerId)
+        && ownerEpoch == that.ownerEpoch
+        && ownerLastTransferTimeMs == that.ownerLastTransferTimeMs
+        && Objects.equals(ownerLeaseExpireTimeMs, that.ownerLeaseExpireTimeMs);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(topicName, creationTime, config);
+    return Objects.hash(
+        topicName,
+        creationTime,
+        config,
+        ownerId,
+        ownerEpoch,
+        ownerLastTransferTimeMs,
+        ownerLeaseExpireTimeMs);
   }
 
   @Override
@@ -274,6 +422,14 @@ public class TopicMeta {
         + creationTime
         + ", config="
         + config
+        + ", ownerId='"
+        + ownerId
+        + "', ownerEpoch="
+        + ownerEpoch
+        + ", ownerLastTransferTimeMs="
+        + ownerLastTransferTimeMs
+        + ", ownerLeaseExpireTimeMs="
+        + ownerLeaseExpireTimeMs
         + '}';
   }
 }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
index d9c280e1493..03042875b70 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.subscription.topic;
 
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -55,6 +56,51 @@ public class TopicDeSerTest {
         topicMeta.getSubscribedConsumerGroupIds(), 
topicMeta2.getSubscribedConsumerGroupIds());
   }
 
+  @Test
+  public void testTopicOwnerDeSer() throws IOException {
+    Map<String, String> topicAttributes = new HashMap<>();
+    topicAttributes.put(TopicConstant.OWNER_ID_KEY, "sn1");
+    topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "5");
+
+    TopicMeta topicMeta = new TopicMeta("test_topic", 1, topicAttributes);
+
+    Assert.assertTrue(topicMeta.isOwnerFencingEnabled());
+    Assert.assertEquals("sn1", topicMeta.getOwnerId());
+    Assert.assertEquals(5L, topicMeta.getOwnerEpoch());
+    Assert.assertTrue(topicMeta.matchesOwner("sn1", 5L));
+    Assert.assertFalse(topicMeta.matchesOwner("sn2", 5L));
+    Assert.assertFalse(topicMeta.matchesOwner("sn1", 4L));
+
+    TopicMeta topicMeta1 = TopicMeta.deserialize(topicMeta.serialize());
+    TopicMeta topicMeta2 = topicMeta1.deepCopy();
+
+    Assert.assertEquals(topicMeta, topicMeta1);
+    Assert.assertEquals(topicMeta, topicMeta2);
+    Assert.assertEquals(topicMeta.getOwnerId(), topicMeta2.getOwnerId());
+    Assert.assertEquals(topicMeta.getOwnerEpoch(), topicMeta2.getOwnerEpoch());
+    Assert.assertEquals(
+        topicMeta.getOwnerLastTransferTimeMs(), 
topicMeta2.getOwnerLastTransferTimeMs());
+    Assert.assertEquals(
+        topicMeta.getOwnerLeaseExpireTimeMs(), 
topicMeta2.getOwnerLeaseExpireTimeMs());
+
+    topicMeta.transferOwner("sn2", 6L, 100L);
+    Assert.assertEquals("sn2", topicMeta.getOwnerId());
+    Assert.assertEquals(6L, topicMeta.getOwnerEpoch());
+    Assert.assertEquals("sn2", 
topicMeta.getConfig().getString(TopicConstant.OWNER_ID_KEY));
+    Assert.assertEquals(
+        6L, 
topicMeta.getConfig().getLong(TopicConstant.OWNER_EPOCH_KEY).longValue());
+    Assert.assertEquals(
+        100L,
+        
topicMeta.getConfig().getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY).longValue());
+
+    topicMeta.clearOwner();
+    Assert.assertFalse(topicMeta.isOwnerFencingEnabled());
+    
Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_ID_KEY));
+    
Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_EPOCH_KEY));
+    Assert.assertFalse(
+        
topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY));
+  }
+
   @Test
   public void testGenerateExtractorAttributesWithEncryptedPassword() {
     final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new 
HashMap<>());

Reply via email to