This is an automated email from the ASF dual-hosted git repository. VGalaxies pushed a commit to branch subscription-topic-owner-fencing in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 76f2a882411ba790f7db290740f0232a1e5c4022 Author: OpenAI Codex <[email protected]> AuthorDate: Thu May 28 05:53:30 2026 +0000 Subscription: add topic owner epoch fencing --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 4 + .../rpc/subscription/config/ConsumerConfig.java | 10 ++ .../rpc/subscription/config/ConsumerConstant.java | 2 + .../rpc/subscription/config/TopicConstant.java | 4 + .../base/AbstractSubscriptionConsumer.java | 22 +++ .../base/AbstractSubscriptionConsumerBuilder.java | 23 +++ .../base/AbstractSubscriptionProvider.java | 12 ++ .../consumer/table/SubscriptionTableProvider.java | 4 + .../table/SubscriptionTablePullConsumer.java | 4 + .../SubscriptionTablePullConsumerBuilder.java | 18 +++ .../table/SubscriptionTablePushConsumer.java | 4 + .../SubscriptionTablePushConsumerBuilder.java | 18 +++ .../consumer/tree/SubscriptionTreeProvider.java | 4 + .../tree/SubscriptionTreePullConsumer.java | 21 +++ .../tree/SubscriptionTreePullConsumerBuilder.java | 18 +++ .../tree/SubscriptionTreePushConsumer.java | 21 +++ .../tree/SubscriptionTreePushConsumerBuilder.java | 18 +++ .../subscription/agent/SubscriptionTopicAgent.java | 74 ++++++++++ .../receiver/SubscriptionReceiverV1.java | 57 ++++++++ .../receiver/SubscriptionReceiverV1Test.java | 88 ++++++++++++ .../commons/subscription/meta/topic/TopicMeta.java | 160 ++++++++++++++++++++- .../commons/subscription/topic/TopicDeSerTest.java | 46 ++++++ 22 files changed, 630 insertions(+), 2 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index e5014681fa7..01fde3cb8df 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -316,6 +316,10 @@ public enum TSStatusCode { SHOW_SUBSCRIPTION_ERROR(1910), SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911), SUBSCRIPTION_NOT_ENABLED_ERROR(1912), + SUBSCRIPTION_OWNER_FENCED(1913), + SUBSCRIPTION_OWNER_REQUIRED(1914), + SUBSCRIPTION_OWNER_EPOCH_REQUIRED(1915), + SUBSCRIPTION_OWNER_LEASE_EXPIRED(1916), // Topic CREATE_TOPIC_ERROR(2000), diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 3cb0087d682..13f2a9ee3fb 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -68,6 +68,16 @@ public class ConsumerConfig extends PipeParameters { return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY); } + public String getOwnerId() { + return getString(ConsumerConstant.OWNER_ID_KEY); + } + + public Long getOwnerEpoch() { + return hasAttribute(ConsumerConstant.OWNER_EPOCH_KEY) + ? getLong(ConsumerConstant.OWNER_EPOCH_KEY) + : null; + } + public String getUsername() { return getString(ConsumerConstant.USERNAME_KEY); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index 90d2ea7a01f..3df95facf36 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -40,6 +40,8 @@ public class ConsumerConstant { public static final String CONSUMER_ID_KEY = "consumer-id"; public static final String CONSUMER_GROUP_ID_KEY = "group-id"; + public static final String OWNER_ID_KEY = "owner-id"; + public static final String OWNER_EPOCH_KEY = "owner-epoch"; public static final String HEARTBEAT_INTERVAL_MS_KEY = "heartbeat-interval-ms"; public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 30_000L; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java index 52c8e4de752..73036684ccd 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java @@ -61,6 +61,10 @@ public class TopicConstant { public static final String STRICT_KEY = "strict"; public static final String STRICT_DEFAULT_VALUE = "true"; + public static final String OWNER_ID_KEY = "owner-id"; + public static final String OWNER_EPOCH_KEY = "owner-epoch"; + public static final String OWNER_LEASE_EXPIRE_TIME_MS_KEY = "owner-lease-expire-time-ms"; + private TopicConstant() { throw new IllegalStateException(SubscriptionMessages.UTILITY_CLASS); } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index 63e5a263f22..57c78dbb4d0 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -105,6 +105,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { protected String consumerId; protected String consumerGroupId; + protected String ownerId; + protected Long ownerEpoch; private final long heartbeatIntervalMs; private final long endpointsSyncIntervalMs; @@ -154,6 +156,14 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { return consumerGroupId; } + public String getOwnerId() { + return ownerId; + } + + public Long getOwnerEpoch() { + return ownerEpoch; + } + /////////////////////////////// ctor /////////////////////////////// protected AbstractSubscriptionConsumer(final AbstractSubscriptionConsumerBuilder builder) { @@ -183,6 +193,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { this.consumerId = builder.consumerId; this.consumerGroupId = builder.consumerGroupId; + this.ownerId = builder.ownerId; + this.ownerEpoch = builder.ownerEpoch; this.heartbeatIntervalMs = builder.heartbeatIntervalMs; this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs; @@ -213,6 +225,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { .encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY)) .consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY)) .consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)) + .ownerId((String) properties.get(ConsumerConstant.OWNER_ID_KEY)) + .ownerEpoch((Long) properties.get(ConsumerConstant.OWNER_EPOCH_KEY)) .heartbeatIntervalMs( (Long) properties.getOrDefault( @@ -394,6 +408,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs); @@ -408,6 +424,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { this.encryptedPassword, this.consumerId, this.consumerGroupId, + this.ownerId, + this.ownerEpoch, this.thriftMaxFrameSize, this.heartbeatIntervalMs, this.connectionTimeoutInMs); @@ -1429,6 +1447,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { final Map<String, String> result = new HashMap<>(); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); + result.put("ownerId", ownerId); + result.put("ownerEpoch", String.valueOf(ownerEpoch)); result.put("isClosed", isClosed.toString()); result.put("fileSaveDir", fileSaveDir); result.put( @@ -1443,6 +1463,8 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable { final Map<String, String> result = new HashMap<>(); result.put("consumerId", consumerId); result.put("consumerGroupId", consumerGroupId); + result.put("ownerId", ownerId); + result.put("ownerEpoch", String.valueOf(ownerEpoch)); result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs)); result.put("endpointsSyncIntervalMs", String.valueOf(endpointsSyncIntervalMs)); result.put("providers", providers.toString()); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java index 991d237ed2b..a0c4b421ed1 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java @@ -40,6 +40,8 @@ public class AbstractSubscriptionConsumerBuilder { protected String consumerId; protected String consumerGroupId; + protected String ownerId; + protected Long ownerEpoch; protected long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; protected long endpointsSyncIntervalMs = @@ -111,6 +113,27 @@ public class AbstractSubscriptionConsumerBuilder { return this; } + public AbstractSubscriptionConsumerBuilder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public AbstractSubscriptionConsumerBuilder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + + public AbstractSubscriptionConsumerBuilder ownerEpoch(@Nullable final Long ownerEpoch) { + if (Objects.isNull(ownerEpoch)) { + return this; + } + this.ownerEpoch = ownerEpoch; + return this; + } + public AbstractSubscriptionConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java index 413c609abbf..7225f6f8755 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java @@ -81,6 +81,8 @@ public abstract class AbstractSubscriptionProvider { private String consumerId; private String consumerGroupId; + private final String ownerId; + private final Long ownerEpoch; private final AtomicBoolean isClosed = new AtomicBoolean(true); private final AtomicBoolean isAvailable = new AtomicBoolean(false); @@ -109,6 +111,8 @@ public abstract class AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -124,6 +128,8 @@ public abstract class AbstractSubscriptionProvider { this.endPoint = endPoint; this.consumerId = consumerId; this.consumerGroupId = consumerGroupId; + this.ownerId = ownerId; + this.ownerEpoch = ownerEpoch; this.username = username; this.password = password; this.encryptedPassword = encryptedPassword; @@ -176,6 +182,12 @@ public abstract class AbstractSubscriptionProvider { final Map<String, String> consumerAttributes = new HashMap<>(); consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId); consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); + if (ownerId != null) { + consumerAttributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId); + } + if (ownerEpoch != null) { + consumerAttributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + } consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); if (encryptedPassword != null) { diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java index 84470d283c2..ff67e3e532c 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java @@ -33,6 +33,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -43,6 +45,8 @@ final class SubscriptionTableProvider extends AbstractSubscriptionProvider { encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java index 8f712782fb5..4c390c96420 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java @@ -45,6 +45,8 @@ public class SubscriptionTablePullConsumer extends AbstractSubscriptionPullConsu final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -55,6 +57,8 @@ public class SubscriptionTablePullConsumer extends AbstractSubscriptionPullConsu encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java index 939228a7f49..efc99debb49 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java @@ -74,6 +74,24 @@ public class SubscriptionTablePullConsumerBuilder extends AbstractSubscriptionPu return this; } + @Override + public SubscriptionTablePullConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTablePullConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTablePullConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTablePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java index e90afc1d8d1..4c85993a933 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java @@ -41,6 +41,8 @@ public class SubscriptionTablePushConsumer extends AbstractSubscriptionPushConsu final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -51,6 +53,8 @@ public class SubscriptionTablePushConsumer extends AbstractSubscriptionPushConsu encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java index 27bf328fea9..143b34056af 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java @@ -76,6 +76,24 @@ public class SubscriptionTablePushConsumerBuilder extends AbstractSubscriptionPu return this; } + @Override + public SubscriptionTablePushConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTablePushConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTablePushConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTablePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java index 3589fbbcf74..8720c577d4d 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java @@ -33,6 +33,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -43,6 +45,8 @@ final class SubscriptionTreeProvider extends AbstractSubscriptionProvider { encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java index 7225036aaa4..d4d61513062 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java @@ -52,6 +52,8 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -62,6 +64,8 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); @@ -85,6 +89,8 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) + .ownerId(builder.ownerId) + .ownerEpoch(builder.ownerEpoch) .heartbeatIntervalMs(builder.heartbeatIntervalMs) .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs) .fileSaveDir(builder.fileSaveDir) @@ -238,6 +244,8 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum private String consumerId; private String consumerGroupId; + private String ownerId; + private Long ownerEpoch; private long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; private long endpointsSyncIntervalMs = @@ -299,6 +307,19 @@ public class SubscriptionTreePullConsumer extends AbstractSubscriptionPullConsum return this; } + public Builder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public Builder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java index cbceb95d77f..2d057b7bfbd 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java @@ -74,6 +74,24 @@ public class SubscriptionTreePullConsumerBuilder extends AbstractSubscriptionPul return this; } + @Override + public SubscriptionTreePullConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTreePullConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTreePullConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTreePullConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java index 4d8a5ef3e16..8fbc33d14ce 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java @@ -51,6 +51,8 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum final String encryptedPassword, final String consumerId, final String consumerGroupId, + final String ownerId, + final Long ownerEpoch, final int thriftMaxFrameSize, final long heartbeatIntervalMs, final int connectionTimeoutInMs) { @@ -61,6 +63,8 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum encryptedPassword, consumerId, consumerGroupId, + ownerId, + ownerEpoch, thriftMaxFrameSize, heartbeatIntervalMs, connectionTimeoutInMs); @@ -84,6 +88,8 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum .encryptedPassword(builder.encryptedPassword) .consumerId(builder.consumerId) .consumerGroupId(builder.consumerGroupId) + .ownerId(builder.ownerId) + .ownerEpoch(builder.ownerEpoch) .heartbeatIntervalMs(builder.heartbeatIntervalMs) .endpointsSyncIntervalMs(builder.endpointsSyncIntervalMs) .fileSaveDir(builder.fileSaveDir) @@ -192,6 +198,8 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum private String consumerId; private String consumerGroupId; + private String ownerId; + private Long ownerEpoch; private long heartbeatIntervalMs = ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE; private long endpointsSyncIntervalMs = @@ -256,6 +264,19 @@ public class SubscriptionTreePushConsumer extends AbstractSubscriptionPushConsum return this; } + public Builder ownerId(@Nullable final String ownerId) { + if (Objects.isNull(ownerId)) { + return this; + } + this.ownerId = ownerId; + return this; + } + + public Builder ownerEpoch(final long ownerEpoch) { + this.ownerEpoch = ownerEpoch; + return this; + } + public Builder heartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java index 86594433e77..9067f3e7ae5 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java @@ -76,6 +76,24 @@ public class SubscriptionTreePushConsumerBuilder extends AbstractSubscriptionPus return this; } + @Override + public SubscriptionTreePushConsumerBuilder ownerId(final String ownerId) { + super.ownerId(ownerId); + return this; + } + + @Override + public SubscriptionTreePushConsumerBuilder ownerEpoch(final long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + + @Override + public SubscriptionTreePushConsumerBuilder ownerEpoch(final Long ownerEpoch) { + super.ownerEpoch(ownerEpoch); + return this; + } + @Override public SubscriptionTreePushConsumerBuilder heartbeatIntervalMs(final long heartbeatIntervalMs) { super.heartbeatIntervalMs(heartbeatIntervalMs); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java index 9b629ab9c8d..cf0c168e7f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java @@ -19,10 +19,14 @@ package org.apache.iotdb.db.subscription.agent; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.commons.subscription.meta.topic.TopicMetaKeeper; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.config.TopicConstant; @@ -31,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -188,4 +193,73 @@ public class SubscriptionTopicAgent { releaseReadLock(); } } + + public TSStatus checkTopicOwner(final ConsumerConfig consumerConfig, final String topicName) { + acquireReadLock(); + try { + if (!topicMetaKeeper.containsTopicMeta(topicName)) { + return RpcUtils.SUCCESS_STATUS; + } + + final TopicMeta topicMeta = topicMetaKeeper.getTopicMeta(topicName); + if (!topicMeta.isOwnerFencingEnabled()) { + return RpcUtils.SUCCESS_STATUS; + } + + final String requestOwnerId = consumerConfig.getOwnerId(); + if (Objects.isNull(requestOwnerId)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED, + String.format( + "Subscription: topic %s enables owner fencing, but consumer %s does not carry owner-id.", + topicName, consumerConfig)); + } + + final Long requestOwnerEpoch = consumerConfig.getOwnerEpoch(); + if (Objects.isNull(requestOwnerEpoch)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_EPOCH_REQUIRED, + String.format( + "Subscription: topic %s enables owner fencing, but consumer %s does not carry owner-epoch.", + topicName, consumerConfig)); + } + + if (Objects.nonNull(topicMeta.getOwnerLeaseExpireTimeMs()) + && System.currentTimeMillis() > topicMeta.getOwnerLeaseExpireTimeMs()) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_LEASE_EXPIRED, + String.format( + "Subscription: owner lease for topic %s has expired, owner-id: %s, owner-epoch: %s.", + topicName, topicMeta.getOwnerId(), topicMeta.getOwnerEpoch())); + } + + if (!topicMeta.matchesOwner(requestOwnerId, requestOwnerEpoch)) { + return RpcUtils.getStatus( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED, + String.format( + "Subscription: consumer owner is fenced for topic %s, request owner-id: %s," + + " request owner-epoch: %s, current owner-id: %s, current owner-epoch: %s.", + topicName, + requestOwnerId, + requestOwnerEpoch, + topicMeta.getOwnerId(), + topicMeta.getOwnerEpoch())); + } + + return RpcUtils.SUCCESS_STATUS; + } finally { + releaseReadLock(); + } + } + + public TSStatus checkTopicOwners( + final ConsumerConfig consumerConfig, final Iterable<String> topicNames) { + for (final String topicName : topicNames) { + final TSStatus status = checkTopicOwner(consumerConfig, topicName); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + } + return RpcUtils.SUCCESS_STATUS; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 2c4accecf45..fe08ffc43ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -326,6 +326,16 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { } // TODO: do something + final TSStatus ownerStatus = + SubscriptionAgent.topic() + .checkTopicOwners( + consumerConfig, + SubscriptionAgent.consumer() + .getTopicNamesSubscribedByConsumer( + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(ownerStatus); + } LOGGER.info(DataNodeMiscMessages.SUBSCRIPTION_CONSUMER_HEARTBEAT_SUCCESS, consumerConfig); @@ -406,6 +416,11 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { // subscribe topics final Set<String> topicNames = req.getTopicNames(); + final TSStatus ownerStatus = + SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNames); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeSubscribeResp.toTPipeSubscribeResp(ownerStatus); + } subscribe(consumerConfig, topicNames); LOGGER.info( @@ -498,16 +513,48 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { if (SubscriptionPollRequestType.isValidatedRequestType(requestType)) { switch (SubscriptionPollRequestType.valueOf(requestType)) { case POLL: + final Set<String> pollTopicNames = ((PollPayload) request.getPayload()).getTopicNames(); + final Set<String> subscribedTopicNames = + SubscriptionAgent.consumer() + .getTopicNamesSubscribedByConsumer( + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()); + final Set<String> topicNamesToCheck = new HashSet<>(pollTopicNames); + topicNamesToCheck.removeIf(topicName -> !subscribedTopicNames.contains(topicName)); + final TSStatus ownerStatus = + SubscriptionAgent.topic().checkTopicOwners(consumerConfig, topicNamesToCheck); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp(ownerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollRequest( consumerConfig, (PollPayload) request.getPayload(), maxBytes); break; case POLL_FILE: + final TSStatus tsFileOwnerStatus = + SubscriptionAgent.topic() + .checkTopicOwner( + consumerConfig, + ((PollFilePayload) request.getPayload()).getCommitContext().getTopicName()); + if (tsFileOwnerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp( + tsFileOwnerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollTsFileRequest( consumerConfig, (PollFilePayload) request.getPayload()); break; case POLL_TABLETS: + final TSStatus tabletsOwnerStatus = + SubscriptionAgent.topic() + .checkTopicOwner( + consumerConfig, + ((PollTabletsPayload) request.getPayload()) + .getCommitContext() + .getTopicName()); + if (tabletsOwnerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribePollResp.toTPipeSubscribeResp( + tabletsOwnerStatus, Collections.emptyList()); + } events = handlePipeSubscribePollTabletsRequest( consumerConfig, (PollTabletsPayload) request.getPayload()); @@ -666,6 +713,16 @@ public class SubscriptionReceiverV1 implements SubscriptionReceiver { // commit (ack or nack) final List<SubscriptionCommitContext> commitContexts = req.getCommitContexts(); + final TSStatus ownerStatus = + SubscriptionAgent.topic() + .checkTopicOwners( + consumerConfig, + commitContexts.stream() + .map(SubscriptionCommitContext::getTopicName) + .collect(Collectors.toSet())); + if (ownerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return PipeSubscribeCommitResp.toTPipeSubscribeResp(ownerStatus); + } final boolean nack = req.isNack(); final List<SubscriptionCommitContext> successfulCommitContexts = SubscriptionAgent.broker().commit(consumerConfig, commitContexts, nack); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java index ba0070187e3..b570d58a10c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1Test.java @@ -20,14 +20,19 @@ package org.apache.iotdb.db.subscription.receiver; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.junit.Assert; import org.junit.Test; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -82,6 +87,70 @@ public class SubscriptionReceiverV1Test { invokeCalculateConsumerInactivityTimeoutMs(receiver, createConsumerConfig(5_000L))); } + @Test + public void testTopicOwnerFencingStatus() { + final String topicName = "topic-" + UUID.randomUUID(); + + SubscriptionAgent.topic().handleSingleTopicMetaChanges(createTopicMeta(topicName, "sn1", 7L)); + try { + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L, "sn1", 7L), topicName) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L, "sn2", 7L), topicName) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_REQUIRED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwner(createConsumerConfig(1_000L), topicName) + .getCode()); + } finally { + SubscriptionAgent.topic().handleDropTopic(topicName); + } + } + + @Test + public void testOldOwnerFencedAfterNetworkPartitionAndTopicOwnerTransfer() { + final String topicName = "topic-" + UUID.randomUUID(); + final TopicMeta topicMeta = createTopicMeta(topicName, "sn1", 5L); + final ConsumerConfig oldSnConsumer = createConsumerConfig(1_000L, "sn1", 5L); + final ConsumerConfig newSnConsumer = createConsumerConfig(1_000L, "sn2", 6L); + + SubscriptionAgent.topic().handleSingleTopicMetaChanges(topicMeta); + try { + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + + final TopicMeta transferredTopicMeta = topicMeta.deepCopy(); + transferredTopicMeta.transferOwner("sn2", 6L); + SubscriptionAgent.topic().handleSingleTopicMetaChanges(transferredTopicMeta); + + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(oldSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUBSCRIPTION_OWNER_FENCED.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwners(oldSnConsumer, Collections.singleton(topicName)) + .getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic().checkTopicOwner(newSnConsumer, topicName).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + SubscriptionAgent.topic() + .checkTopicOwners(newSnConsumer, Collections.singleton(topicName)) + .getCode()); + } finally { + SubscriptionAgent.topic().handleDropTopic(topicName); + } + } + private long invokeCalculateConsumerInactivityTimeoutMs( final SubscriptionReceiverV1 receiver, final ConsumerConfig consumerConfig) throws Exception { final Method method = @@ -91,11 +160,30 @@ public class SubscriptionReceiverV1Test { return (long) method.invoke(receiver, consumerConfig); } + private TopicMeta createTopicMeta( + final String topicName, final String ownerId, final long ownerEpoch) { + final Map<String, String> topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, ownerId); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + return new TopicMeta(topicName, 1, topicAttributes); + } + private ConsumerConfig createConsumerConfig(final long heartbeatIntervalMs) { + return createConsumerConfig(heartbeatIntervalMs, null, null); + } + + private ConsumerConfig createConsumerConfig( + final long heartbeatIntervalMs, final String ownerId, final Long ownerEpoch) { final Map<String, String> attributes = new HashMap<>(); attributes.put(ConsumerConstant.CONSUMER_ID_KEY, "consumer-" + UUID.randomUUID()); attributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, "group-" + UUID.randomUUID()); attributes.put(ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); + if (ownerId != null) { + attributes.put(ConsumerConstant.OWNER_ID_KEY, ownerId); + } + if (ownerEpoch != null) { + attributes.put(ConsumerConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + } return new ConsumerConfig(attributes); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index badb77d6f48..6a6afc1221a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility; import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.rpc.subscription.config.TopicConfig; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -45,11 +46,18 @@ public class TopicMeta { private long creationTime; // unit in ms private TopicConfig config; + private String ownerId; + private long ownerEpoch; + private long ownerLastTransferTimeMs; + private Long ownerLeaseExpireTimeMs; + // TODO: remove this variable later private Set<String> subscribedConsumerGroupIds; // unused now private TopicMeta() { this.config = new TopicConfig(new HashMap<>()); + this.ownerEpoch = -1L; + this.ownerLastTransferTimeMs = -1L; this.subscribedConsumerGroupIds = new HashSet<>(); } @@ -59,6 +67,9 @@ public class TopicMeta { this.topicName = topicName; this.creationTime = creationTime; this.config = new TopicConfig(topicAttributes); + this.ownerEpoch = -1L; + this.ownerLastTransferTimeMs = -1L; + initOwnerFromTopicAttributes(topicAttributes); this.subscribedConsumerGroupIds = new HashSet<>(); } @@ -68,6 +79,10 @@ public class TopicMeta { copied.topicName = topicName; copied.creationTime = creationTime; copied.config = new TopicConfig(new HashMap<>(config.getAttribute())); + copied.ownerId = ownerId; + copied.ownerEpoch = ownerEpoch; + copied.ownerLastTransferTimeMs = ownerLastTransferTimeMs; + copied.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs; copied.subscribedConsumerGroupIds = new HashSet<>(subscribedConsumerGroupIds); return copied; @@ -85,6 +100,81 @@ public class TopicMeta { return config; } + public boolean isOwnerFencingEnabled() { + return Objects.nonNull(ownerId) && ownerEpoch >= 0; + } + + public String getOwnerId() { + return ownerId; + } + + public long getOwnerEpoch() { + return ownerEpoch; + } + + public long getOwnerLastTransferTimeMs() { + return ownerLastTransferTimeMs; + } + + public Long getOwnerLeaseExpireTimeMs() { + return ownerLeaseExpireTimeMs; + } + + public void transferOwner(final String ownerId, final long ownerEpoch) { + transferOwner(ownerId, ownerEpoch, null); + } + + public void transferOwner( + final String ownerId, final long ownerEpoch, final Long ownerLeaseExpireTimeMs) { + if (Objects.isNull(ownerId) || ownerId.isEmpty()) { + throw new IllegalArgumentException("Subscription topic owner id should not be empty"); + } + if (ownerEpoch < 0) { + throw new IllegalArgumentException("Subscription topic owner epoch should not be negative"); + } + if (isOwnerFencingEnabled() && ownerEpoch <= this.ownerEpoch) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should increase monotonically, current epoch is %s," + + " incoming epoch is %s", + this.ownerEpoch, ownerEpoch)); + } + + this.ownerId = ownerId; + this.ownerEpoch = ownerEpoch; + this.ownerLeaseExpireTimeMs = ownerLeaseExpireTimeMs; + this.ownerLastTransferTimeMs = System.currentTimeMillis(); + + config.getAttribute().put(TopicConstant.OWNER_ID_KEY, ownerId); + config.getAttribute().put(TopicConstant.OWNER_EPOCH_KEY, String.valueOf(ownerEpoch)); + if (Objects.nonNull(ownerLeaseExpireTimeMs)) { + config + .getAttribute() + .put( + TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY, String.valueOf(ownerLeaseExpireTimeMs)); + } else { + config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY); + } + } + + public void clearOwner() { + ownerId = null; + ownerEpoch = -1L; + ownerLastTransferTimeMs = -1L; + ownerLeaseExpireTimeMs = null; + config.getAttribute().remove(TopicConstant.OWNER_ID_KEY); + config.getAttribute().remove(TopicConstant.OWNER_EPOCH_KEY); + config.getAttribute().remove(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY); + } + + public boolean matchesOwner(final String requestOwnerId, final Long requestOwnerEpoch) { + return !isOwnerFencingEnabled() + || (Objects.equals(ownerId, requestOwnerId) + && Objects.equals(ownerEpoch, requestOwnerEpoch) + && (Objects.isNull(ownerLeaseExpireTimeMs) + || System.currentTimeMillis() <= ownerLeaseExpireTimeMs)); + } + /** * @return true if the consumer group did not already subscribe this topic */ @@ -136,6 +226,17 @@ public class TopicMeta { for (final String subscribedConsumerGroupID : subscribedConsumerGroupIds) { ReadWriteIOUtils.write(subscribedConsumerGroupID, outputStream); } + + ReadWriteIOUtils.write(isOwnerFencingEnabled(), outputStream); + if (isOwnerFencingEnabled()) { + ReadWriteIOUtils.write(ownerId, outputStream); + ReadWriteIOUtils.write(ownerEpoch, outputStream); + ReadWriteIOUtils.write(ownerLastTransferTimeMs, outputStream); + ReadWriteIOUtils.write(Objects.nonNull(ownerLeaseExpireTimeMs), outputStream); + if (Objects.nonNull(ownerLeaseExpireTimeMs)) { + ReadWriteIOUtils.write(ownerLeaseExpireTimeMs, outputStream); + } + } } public static TopicMeta deserialize(final InputStream inputStream) throws IOException { @@ -156,6 +257,14 @@ public class TopicMeta { topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(inputStream)); } + if (inputStream.available() > 0 && ReadWriteIOUtils.readBool(inputStream)) { + topicMeta.ownerId = ReadWriteIOUtils.readString(inputStream); + topicMeta.ownerEpoch = ReadWriteIOUtils.readLong(inputStream); + topicMeta.ownerLastTransferTimeMs = ReadWriteIOUtils.readLong(inputStream); + topicMeta.ownerLeaseExpireTimeMs = + ReadWriteIOUtils.readBool(inputStream) ? ReadWriteIOUtils.readLong(inputStream) : null; + } + return topicMeta; } @@ -177,9 +286,37 @@ public class TopicMeta { topicMeta.subscribedConsumerGroupIds.add(ReadWriteIOUtils.readString(byteBuffer)); } + if (byteBuffer.hasRemaining() && ReadWriteIOUtils.readBool(byteBuffer)) { + topicMeta.ownerId = ReadWriteIOUtils.readString(byteBuffer); + topicMeta.ownerEpoch = ReadWriteIOUtils.readLong(byteBuffer); + topicMeta.ownerLastTransferTimeMs = ReadWriteIOUtils.readLong(byteBuffer); + topicMeta.ownerLeaseExpireTimeMs = + ReadWriteIOUtils.readBool(byteBuffer) ? ReadWriteIOUtils.readLong(byteBuffer) : null; + } + return topicMeta; } + private void initOwnerFromTopicAttributes(final Map<String, String> topicAttributes) { + final TopicConfig topicConfig = new TopicConfig(topicAttributes); + final String configuredOwnerId = topicConfig.getString(TopicConstant.OWNER_ID_KEY); + if (Objects.isNull(configuredOwnerId)) { + return; + } + + final Long configuredOwnerEpoch = topicConfig.getLong(TopicConstant.OWNER_EPOCH_KEY); + if (Objects.isNull(configuredOwnerEpoch)) { + throw new IllegalArgumentException( + String.format( + "Subscription topic owner epoch should be set when %s is set", + TopicConstant.OWNER_ID_KEY)); + } + transferOwner( + configuredOwnerId, + configuredOwnerEpoch, + topicConfig.getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); + } + /////////////////////////////// utilities /////////////////////////////// public Map<String, String> generateExtractorAttributes(final String username) { @@ -257,12 +394,23 @@ public class TopicMeta { final TopicMeta that = (TopicMeta) obj; return creationTime == that.creationTime && Objects.equals(topicName, that.topicName) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(ownerId, that.ownerId) + && ownerEpoch == that.ownerEpoch + && ownerLastTransferTimeMs == that.ownerLastTransferTimeMs + && Objects.equals(ownerLeaseExpireTimeMs, that.ownerLeaseExpireTimeMs); } @Override public int hashCode() { - return Objects.hash(topicName, creationTime, config); + return Objects.hash( + topicName, + creationTime, + config, + ownerId, + ownerEpoch, + ownerLastTransferTimeMs, + ownerLeaseExpireTimeMs); } @Override @@ -274,6 +422,14 @@ public class TopicMeta { + creationTime + ", config=" + config + + ", ownerId='" + + ownerId + + "', ownerEpoch=" + + ownerEpoch + + ", ownerLastTransferTimeMs=" + + ownerLastTransferTimeMs + + ", ownerLeaseExpireTimeMs=" + + ownerLeaseExpireTimeMs + '}'; } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java index d9c280e1493..03042875b70 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.subscription.topic; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.junit.Assert; import org.junit.Test; @@ -55,6 +56,51 @@ public class TopicDeSerTest { topicMeta.getSubscribedConsumerGroupIds(), topicMeta2.getSubscribedConsumerGroupIds()); } + @Test + public void testTopicOwnerDeSer() throws IOException { + Map<String, String> topicAttributes = new HashMap<>(); + topicAttributes.put(TopicConstant.OWNER_ID_KEY, "sn1"); + topicAttributes.put(TopicConstant.OWNER_EPOCH_KEY, "5"); + + TopicMeta topicMeta = new TopicMeta("test_topic", 1, topicAttributes); + + Assert.assertTrue(topicMeta.isOwnerFencingEnabled()); + Assert.assertEquals("sn1", topicMeta.getOwnerId()); + Assert.assertEquals(5L, topicMeta.getOwnerEpoch()); + Assert.assertTrue(topicMeta.matchesOwner("sn1", 5L)); + Assert.assertFalse(topicMeta.matchesOwner("sn2", 5L)); + Assert.assertFalse(topicMeta.matchesOwner("sn1", 4L)); + + TopicMeta topicMeta1 = TopicMeta.deserialize(topicMeta.serialize()); + TopicMeta topicMeta2 = topicMeta1.deepCopy(); + + Assert.assertEquals(topicMeta, topicMeta1); + Assert.assertEquals(topicMeta, topicMeta2); + Assert.assertEquals(topicMeta.getOwnerId(), topicMeta2.getOwnerId()); + Assert.assertEquals(topicMeta.getOwnerEpoch(), topicMeta2.getOwnerEpoch()); + Assert.assertEquals( + topicMeta.getOwnerLastTransferTimeMs(), topicMeta2.getOwnerLastTransferTimeMs()); + Assert.assertEquals( + topicMeta.getOwnerLeaseExpireTimeMs(), topicMeta2.getOwnerLeaseExpireTimeMs()); + + topicMeta.transferOwner("sn2", 6L, 100L); + Assert.assertEquals("sn2", topicMeta.getOwnerId()); + Assert.assertEquals(6L, topicMeta.getOwnerEpoch()); + Assert.assertEquals("sn2", topicMeta.getConfig().getString(TopicConstant.OWNER_ID_KEY)); + Assert.assertEquals( + 6L, topicMeta.getConfig().getLong(TopicConstant.OWNER_EPOCH_KEY).longValue()); + Assert.assertEquals( + 100L, + topicMeta.getConfig().getLong(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY).longValue()); + + topicMeta.clearOwner(); + Assert.assertFalse(topicMeta.isOwnerFencingEnabled()); + Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_ID_KEY)); + Assert.assertFalse(topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_EPOCH_KEY)); + Assert.assertFalse( + topicMeta.getConfig().hasAttribute(TopicConstant.OWNER_LEASE_EXPIRE_TIME_MS_KEY)); + } + @Test public void testGenerateExtractorAttributesWithEncryptedPassword() { final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new HashMap<>());
