This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8a388ad924 [feature][broker] Support schemaValidationEnforced on
topic level (#15712)
d8a388ad924 is described below
commit d8a388ad9240d783d203f202136990b1f29b243f
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 24 06:52:19 2022 -0700
[feature][broker] Support schemaValidationEnforced on topic level (#15712)
---
.../broker/admin/impl/PersistentTopicsBase.java | 20 +++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 2 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 51 ++++++++++++++++++++++
.../pulsar/broker/namespace/NamespaceService.java | 2 +-
.../pulsar/broker/service/AbstractTopic.java | 8 ++--
.../service/nonpersistent/NonPersistentTopic.java | 2 -
.../broker/service/persistent/PersistentTopic.java | 4 --
.../apache/pulsar/broker/admin/AdminApi2Test.java | 16 +++++++
.../org/apache/pulsar/client/admin/Topics.java | 30 +++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 38 ++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 33 ++++++++++++++
.../policies/data/HierarchyTopicPolicies.java | 3 ++
.../pulsar/common/policies/data/TopicPolicies.java | 6 +++
.../apache/pulsar/common/protocol/Commands.java | 2 +-
14 files changed, 204 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eab7ac0fe5b..2f3312c4ae2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5019,4 +5019,24 @@ public class PersistentTopicsBase extends AdminResource {
.filter(topic -> includeSystemTopic ? true :
!pulsar().getBrokerService().isSystemTopic(topic))
.collect(Collectors.toList());
}
+
+ protected CompletableFuture<Boolean>
internalGetSchemaValidationEnforced(boolean applied) {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenApply(op ->
op.map(TopicPolicies::getSchemaValidationEnforced).orElseGet(() -> {
+ if (applied) {
+ boolean namespacePolicy =
getNamespacePolicies(namespaceName).schema_validation_enforced;
+ return namespacePolicy ||
pulsar().getConfiguration().isSchemaValidationEnforced();
+ }
+ return false;
+ }));
+ }
+
+ protected CompletableFuture<Void>
internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
+ return getTopicPoliciesAsyncWithRetry(topicName)
+ .thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+
topicPolicies.setSchemaValidationEnforced(schemaValidationEnforced);
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
+ });
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 7dcf97020fb..2162fa3ee87 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1822,7 +1822,7 @@ public class Namespaces extends NamespacesBase {
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Tenant or Namespace doesn't
exist"),
@ApiResponse(code = 412, message = "schemaValidationEnforced value
is not valid")})
- public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant,
+ public void setSchemaValidationEnforced(@PathParam("tenant") String tenant,
@PathParam("namespace") String
namespace,
@ApiParam(value =
"Flag of whether validation
is enforced on the specified namespace",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 87bb1fed581..4cd405d3e9a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3911,5 +3911,56 @@ public class PersistentTopics extends
PersistentTopicsBase {
});
}
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/schemaValidationEnforced")
+ @ApiOperation(value = "Get schema validation enforced flag for topic.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenants or Namespace doesn't
exist") })
+ public void getSchemaValidationEnforced(@Suspended AsyncResponse
asyncResponse,
+ @ApiParam(value = "Specify the
tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the
namespace", required = true)
+ @PathParam("namespace") String
namespace,
+ @ApiParam(value = "Specify topic
name", required = true)
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @QueryParam("applied")
@DefaultValue("false") boolean applied,
+ @ApiParam(value = "Is
authentication required to perform this operation")
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ ->
internalGetSchemaValidationEnforced(applied))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ handleTopicPolicyException("getSchemaValidationEnforced",
ex, asyncResponse);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/{topic}/schemaValidationEnforced")
+ @ApiOperation(value = "Set schema validation enforced flag on topic.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or Namespace doesn't
exist"),
+ @ApiResponse(code = 412, message = "schemaValidationEnforced value
is not valid")})
+ public void setSchemaValidationEnforced(@Suspended AsyncResponse
asyncResponse,
+ @ApiParam(value = "Specify the
tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the
namespace", required = true)
+ @PathParam("namespace") String
namespace,
+ @ApiParam(value = "Specify topic
name", required = true)
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @ApiParam(value = "Is
authentication required to perform this operation")
+ @QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
+ @ApiParam(required = true) boolean
schemaValidationEnforced) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ preValidation(authoritative)
+ .thenCompose(__ ->
internalSetSchemaValidationEnforced(schemaValidationEnforced))
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ handleTopicPolicyException("setSchemaValidationEnforced",
ex, asyncResponse);
+ return null;
+ });
+ }
+
private static final Logger log =
LoggerFactory.getLogger(PersistentTopics.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index fd51837692a..38fda886793 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1215,7 +1215,7 @@ public class NamespaceService implements AutoCloseable {
return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar,
namespaceName)
.thenCompose(peerClusterData -> {
// if peer-cluster-data is present it means namespace is
owned by that peer-cluster and request
- // should be redirect to the peer-cluster
+ // should redirect to the peer-cluster
if (peerClusterData != null) {
return
getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index ad512949168..edc7bc89647 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -108,8 +108,6 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
protected volatile boolean isEncryptionRequired = false;
protected volatile Boolean isAllowAutoUpdateSchema;
- // schema validation enforced flag
- protected volatile boolean schemaValidationEnforced = false;
protected volatile PublishRateLimiter topicPublishRateLimiter;
@@ -221,7 +219,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
DispatchRateImpl.normalize(data.getSubscriptionDispatchRate()));
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
-
+
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
this.subscriptionPolicies = data.getSubscriptionPolicies();
}
@@ -269,6 +267,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
brokerService.getPulsar().getConfig().getClusterName());
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
updateNamespaceDispatchRate(namespacePolicies,
brokerService.getPulsar().getConfig().getClusterName());
+
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
}
private void updateNamespaceDispatchRate(Policies namespacePolicies,
String cluster) {
@@ -362,6 +361,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
topicPolicies.getSchemaCompatibilityStrategy()
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
+
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
}
private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config)
{
@@ -570,7 +570,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
@Override
public boolean getSchemaValidationEnforced() {
- return schemaValidationEnforced;
+ return topicPolicies.getSchemaValidationEnforced().get();
}
public void markBatchMessagePublished() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 58dd9302cf3..b8f41fa4c40 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -169,7 +169,6 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
- schemaValidationEnforced =
policies.schema_validation_enforced;
}
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
@@ -1008,7 +1007,6 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
- schemaValidationEnforced = data.schema_validation_enforced;
List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d8ac3e0032..dec5d80cb5f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -341,8 +341,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
this.isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
-
- schemaValidationEnforced =
policies.schema_validation_enforced;
}).exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and
isEncryptionRequired will be set to false",
topic, ex.getMessage());
@@ -2398,8 +2396,6 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
- schemaValidationEnforced = data.schema_validation_enforced;
-
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 87d4f83f7de..fa1b7ba1657 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -2472,4 +2472,20 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertEquals(topicStats.getPublishers().size(), 2);
topicStats.getPublishers().forEach(p ->
assertTrue(p.isSupportsPartialProducer()));
}
+
+ @Test(dataProvider = "topicType")
+ public void testSchemaValidationEnforced(String topicType) throws
Exception {
+ final String topic = topicType +
"://prop-xyz/ns1/test-schema-validation-enforced";
+ admin.topics().createPartitionedTopic(topic, 1);
+ @Cleanup
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
+ .create();
+ boolean schemaValidationEnforced =
admin.topics().getSchemaValidationEnforced(topic, false);
+ assertEquals(schemaValidationEnforced, false);
+ admin.topics().setSchemaValidationEnforced(topic, true);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin.topics().getSchemaValidationEnforced(topic,
false), true)
+ );
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index ae5e90a287c..48ef03cd67e 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -4013,4 +4013,34 @@ public interface Topics {
* @return a map of replicated subscription status on a topic
*/
CompletableFuture<Map<String, Boolean>>
getReplicatedSubscriptionStatusAsync(String topic, String subName);
+
+ /**
+ * Get schema validation enforced for a topic.
+ *
+ * @param topic topic name
+ * @return whether the schema validation enforced is set or not
+ */
+ boolean getSchemaValidationEnforced(String topic, boolean applied) throws
PulsarAdminException;
+
+ /**
+ * Get schema validation enforced for a topic.
+ *
+ * @param topic topic name
+ */
+ void setSchemaValidationEnforced(String topic, boolean enable) throws
PulsarAdminException;
+
+ /**
+ * Get schema validation enforced for a topic asynchronously.
+ *
+ * @param topic topic name
+ * @return whether the schema validation enforced is set or not
+ */
+ CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String topic,
boolean applied);
+
+ /**
+ * Get schema validation enforced for a topic asynchronously.
+ *
+ * @param topic topic name
+ */
+ CompletableFuture<Void> setSchemaValidationEnforcedAsync(String topic,
boolean enable);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 0b2aad296ff..5314872fc00 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -2970,6 +2970,44 @@ public class TopicsImpl extends BaseResource implements
Topics {
return future;
}
+ @Override
+ public boolean getSchemaValidationEnforced(String topic, boolean applied)
throws PulsarAdminException {
+ return sync(() -> getSchemaValidationEnforcedAsync(topic, applied));
+ }
+
+ @Override
+ public void setSchemaValidationEnforced(String topic, boolean enable)
throws PulsarAdminException {
+ sync(() -> setSchemaValidationEnforcedAsync(topic, enable));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String
topic, boolean applied) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "schemaValidationEnforced");
+ path = path.queryParam("applied", applied);
+ final CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<Boolean>() {
+ @Override
+ public void completed(Boolean enforced) {
+ future.complete(enforced);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> setSchemaValidationEnforcedAsync(String
topic, boolean schemaValidationEnforced) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "schemaValidationEnforced");
+ return asyncPostRequest(path, Entity.entity(schemaValidationEnforced,
MediaType.APPLICATION_JSON));
+ }
+
@Override
public Set<String> getReplicationClusters(String topic, boolean applied)
throws PulsarAdminException {
return sync(() -> getReplicationClustersAsync(topic, applied));
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index c4476472338..3c2d40c43cd 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -242,6 +242,9 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("set-replication-clusters", new
SetReplicationClusters());
jcommander.addCommand("remove-replication-clusters", new
RemoveReplicationClusters());
+ jcommander.addCommand("get-schema-validation-enforce", new
GetSchemaValidationEnforced());
+ jcommander.addCommand("set-schema-validation-enforce", new
SetSchemaValidationEnforced());
+
initDeprecatedCommands();
}
@@ -2840,4 +2843,34 @@ public class CmdTopics extends CmdBase {
}
}
+
+ @Parameters(commandDescription = "Get the schema validation enforced")
+ private class GetSchemaValidationEnforced extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the
applied policy of the topic")
+ private boolean applied = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validateTopicName(params);
+
System.out.println(getAdmin().topics().getSchemaValidationEnforced(topic,
applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the schema whether open schema
validation enforced")
+ private class SetSchemaValidationEnforced extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable", "-e" }, description = "Enable schema
validation enforced")
+ private boolean enable = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String topic = validateTopicName(params);
+ getAdmin().topics().setSchemaValidationEnforced(topic, enable);
+ }
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 0532744bec3..66c21a11716 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -58,6 +58,8 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<SchemaCompatibilityStrategy>
schemaCompatibilityStrategy;
final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
+ final PolicyHierarchyValue<Boolean> schemaValidationEnforced;
+
public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
retentionPolicies = new PolicyHierarchyValue<>();
@@ -86,5 +88,6 @@ public class HierarchyTopicPolicies {
subscriptionDispatchRate = new PolicyHierarchyValue<>();
schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
dispatchRate = new PolicyHierarchyValue<>();
+ schemaValidationEnforced = new PolicyHierarchyValue<>();
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index ae04c2e2178..07e4dff56bd 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -78,6 +78,8 @@ public class TopicPolicies {
@Builder.Default
private Map<String/*subscription*/, SubscriptionPolicies>
subscriptionPolicies = new HashMap<>();
+ private Boolean schemaValidationEnforced;
+
public boolean isGlobalPolicies() {
return isGlobal != null && isGlobal;
}
@@ -174,6 +176,10 @@ public class TopicPolicies {
return subscribeRate != null;
}
+ public boolean isSchemaValidationEnforced() {
+ return schemaValidationEnforced != null;
+ }
+
public Set<String> getReplicationClustersSet() {
return replicationClusters != null ?
Sets.newTreeSet(this.replicationClusters) : null;
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2d8e043058d..285a12321c1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1856,7 +1856,7 @@ public class Commands {
case ExclusiveWithFencing:
return
org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing;
default:
- throw new IllegalArgumentException("Unknonw access mode: " +
accessMode);
+ throw new IllegalArgumentException("Unknown access mode: " +
accessMode);
}
}