This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch fixes.compatible.avro in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 65929891d581b14ed65dbfd0ab36197ce7f6a633 Author: mattisonchao <[email protected]> AuthorDate: Thu Jan 29 13:01:46 2026 +0800 [fix][schema] Illegal character '$' in record --- .../validator/StructSchemaDataValidator.java | 31 ++++- pulsar-broker/src/main/proto/DataRecord.proto | 17 +++ .../schema/validator/SchemaDataValidatorTest.java | 136 +++++++++++++++++++++ 3 files changed, 183 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java index 7f3c4e5e46b..068bc15fe95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/StructSchemaDataValidator.java @@ -22,6 +22,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import java.io.IOException; + +import org.apache.avro.NameValidator; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; @@ -39,6 +41,7 @@ class StructSchemaDataValidator implements SchemaDataValidator { } private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator(); + private static final CompatibleNameValidator COMPATIBLE_NAME_VALIDATOR = new CompatibleNameValidator(); private StructSchemaDataValidator() {} @@ -49,7 +52,7 @@ class StructSchemaDataValidator implements SchemaDataValidator { byte[] data = schemaData.getData(); try { - Schema.Parser avroSchemaParser = new Schema.Parser(); + Schema.Parser avroSchemaParser = new Schema.Parser(COMPATIBLE_NAME_VALIDATOR); avroSchemaParser.setValidateDefaults(false); Schema schema = avroSchemaParser.parse(new String(data, UTF_8)); if (SchemaType.AVRO.equals(schemaData.getType())) { @@ -97,4 +100,30 @@ class StructSchemaDataValidator implements SchemaDataValidator { throw new InvalidSchemaDataException("Invalid schema definition data for " + schemaData.getType() + " schema", cause); } + + static class CompatibleNameValidator implements NameValidator { + + @Override + public Result validate(String name) { + if (name == null) { + return new Result("Null name"); + } + final int length = name.length(); + if (length == 0) { + return new Result("Empty name"); + } + final char first = name.charAt(0); + if (!(Character.isLetter(first) || first == '_' || first == '$')) { + return new Result("Illegal initial character: " + name); + } + for (int i = 1; i < length; i++) { + final char c = name.charAt(i); + // we need to allow $ for the special case + if (!(Character.isLetterOrDigit(c) || c == '_' || c == '$')) { + return new Result("Illegal character in: " + name); + } + } + return OK; + } + } } diff --git a/pulsar-broker/src/main/proto/DataRecord.proto b/pulsar-broker/src/main/proto/DataRecord.proto new file mode 100644 index 00000000000..17b289037a0 --- /dev/null +++ b/pulsar-broker/src/main/proto/DataRecord.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package pulsar.schema; +option java_package = "org.apache.pulsar.broker.service.schema.proto"; + + +message DataRecord { + string field1 = 1; + int64 field2 = 2; + NestedDataRecord field3 = 3; + repeated NestedDataRecord fields4 = 4; + + message NestedDataRecord { + string field1 = 1; + int64 field2 = 2; + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java index 302e5879d28..ce92b0fed14 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidatorTest.java @@ -23,14 +23,22 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.module.jsonSchema.JsonSchema; import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; +import org.apache.avro.protobuf.ProtobufData; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; +import org.apache.pulsar.broker.service.schema.proto.DataRecordOuterClass; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.junit.jupiter.api.Assertions; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator.CompatibleNameValidator; +import org.apache.avro.NameValidator; + @Test(groups = "broker") public class SchemaDataValidatorTest { @@ -148,4 +156,132 @@ public class SchemaDataValidatorTest { } } + @Test + public void testCompatibleNameValidatorValidNames() { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + String[] validNames = { + "validName", + "ValidName", + "valid_name", + "valid$name", + "_validName", + "$validName", + "name123", + "Name_123$", + "a", + "A", + "_", + "$", + "validNameWithMultiple$ymbols_and_numbers123" + }; + + for (String name : validNames) { + NameValidator.Result result = validator.validate(name); + Assertions.assertTrue(result.isOK(), + "Expected validation to pass for name: '" + name + "', but got error: " + result.getErrors()); + } + } + + @Test + public void testCompatibleNameValidatorInvalidNames() { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + String[] invalidNames = { + null, + "", + "123name", + "1name", + "name-with-dash", + "name with space", + "name.with.dot", + "name@symbol", + "name#hash", + "name%percent", + "name&ersand", + "name*asterisk", + "name(parentheses)", + "name+plus", + "name=equals", + "name[brackets]", + "name{braces}", + "name|pipe", + "name\\backslash", + "name:colon", + "name;semicolon", + "name\"quote", + "name'apostrophe", + "name<greater>", + "name,comma", + "name?question", + "name!exclamation", + "name`backtick", + "name~tilde", + "name^caret" + }; + + for (String name : invalidNames) { + NameValidator.Result result = validator.validate(name); + Assertions.assertFalse(result.isOK(), "Expected validation to fail for name: '" + name + "'"); + } + } + + @Test + public void testCompatibleNameValidatorSpecificErrorMessages() throws Exception { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + NameValidator.Result nullResult = validator.validate(null); + Assertions.assertFalse(nullResult.isOK()); + Assertions.assertEquals("Null name", nullResult.getErrors()); + + NameValidator.Result emptyResult = validator.validate(""); + Assertions.assertFalse(emptyResult.isOK()); + Assertions.assertEquals("Empty name", emptyResult.getErrors()); + + NameValidator.Result invalidFirstCharResult = validator.validate("123name"); + Assertions.assertFalse(invalidFirstCharResult.isOK()); + Assertions.assertTrue(invalidFirstCharResult.getErrors().contains("Illegal initial character")); + + NameValidator.Result invalidCharResult = validator.validate("name-with-dash"); + Assertions.assertFalse(invalidCharResult.isOK()); + Assertions.assertTrue(invalidCharResult.getErrors().contains("Illegal character in")); + } + + @Test + public void testCompatibleNameValidatorEdgeCases() throws Exception { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + Assertions.assertTrue(validator.validate("a").isOK()); + Assertions.assertTrue(validator.validate("A").isOK()); + Assertions.assertTrue(validator.validate("_").isOK()); + Assertions.assertTrue(validator.validate("$").isOK()); + + NameValidator.Result longNameResult = validator.validate("a".repeat(1000)); + Assertions.assertTrue(longNameResult.isOK()); + + NameValidator.Result nameWithOnlyDigits = validator.validate("123"); + Assertions.assertFalse(nameWithOnlyDigits.isOK()); + Assertions.assertTrue(nameWithOnlyDigits.getErrors().contains("Illegal initial character")); + } + + @Test + public void testCompatibleNameValidatorUnicodeCharacters() throws Exception { + CompatibleNameValidator validator = new CompatibleNameValidator(); + + NameValidator.Result unicodeResult = validator.validate("名字"); + Assertions.assertFalse(unicodeResult.isOK()); + Assertions.assertTrue(unicodeResult.getErrors().contains("Illegal initial character")); + + NameValidator.Result mixedResult = validator.validate("name字"); + Assertions.assertFalse(mixedResult.isOK()); + Assertions.assertTrue(mixedResult.getErrors().contains("Illegal character in")); + } + + + @Test + public void testAvroCompatible() throws InvalidSchemaDataException { + final ProtobufSchema<DataRecordOuterClass.DataRecord> protobufSchema = ProtobufSchema.of(DataRecordOuterClass.DataRecord.class); + StructSchemaDataValidator.of().validate(SchemaData.fromSchemaInfo(protobufSchema.getSchemaInfo())); + } + }
