This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f2233d6d8518631df1c697589d22d74ad1c6c3a5 Author: ran <[email protected]> AuthorDate: Thu Oct 9 12:41:46 2025 +0800 [improve][client] PIP-420: Update the schema ID format (#24798) (cherry picked from commit 9cc15ddc95adc4d56f45ca5023bbf69020103eed) --- pip/pip-420.md | 7 ++- .../apache/pulsar/schema/ExternalSchemaTest.java | 15 +++--- .../java/org/apache/pulsar/client/api/Schema.java | 4 +- .../apache/pulsar/common/schema/SchemaIdUtil.java | 55 ++++++++++++++++++++++ .../org/apache/pulsar/client/impl/MessageImpl.java | 6 ++- .../client/impl/TypedMessageBuilderImpl.java | 8 +++- .../client/impl/schema/SchemaIdUtilTest.java | 49 +++++++++++++++++++ .../SupportVersioningKeyValueSchemaTest.java | 4 +- 8 files changed, 132 insertions(+), 16 deletions(-) diff --git a/pip/pip-420.md b/pip/pip-420.md index 1aa152ba8f1..8f3d2729676 100644 --- a/pip/pip-420.md +++ b/pip/pip-420.md @@ -103,8 +103,11 @@ When integrating with external schema registries: - For store the external schema ID, this PIP introduces a new optional field `schema_id` in the `MessageMetadata`. - The KeyValueSchema doesn't support using Pulsar's native schema and external schema at the same time. -The KeyValueSchemaID is also a byte array, the format is: keySchemaIdLength(4) + keySchemaId + valueSchemaIdLength(4) + valueSchemaId, -external schemas need to decode the key and value schema IDs from the KeyValueSchemaID. +**Schema ID encoding format**: +- **valueSchemaID**: +magic_byte(-1) + valueSchemaId +- **keyValueSchemaID**: +magic_byte(-2) + keySchemaIdLength(4 bytes) + keySchemaId + valueSchemaIdLength(4 bytes) + valueSchemaId This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism. This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java index 652f9add920..5b3cc5e9019 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaIdUtil; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -101,7 +102,8 @@ public class ExternalSchemaTest extends MockedPulsarServiceBaseTest { for (int i = 0; i < messageCount; i++) { Message<Schemas.PersonFour> message = consumer.receive(); assertTrue(message.getSchemaId().isPresent()); - assertEquals(message.getSchemaId().get(), MockExternalJsonSchema.MOCK_SCHEMA_ID); + assertEquals(message.getSchemaId().get(), + SchemaIdUtil.addMagicHeader(MockExternalJsonSchema.MOCK_SCHEMA_ID, false)); assertEquals(message.getData(), MOCK_SCHEMA_DATA); assertNull(message.getValue()); Assert.assertNotNull(message); @@ -109,7 +111,7 @@ public class ExternalSchemaTest extends MockedPulsarServiceBaseTest { } @Test(timeOut = 1000 * 5) - public void testConflictKvSchema() throws Exception { + public void testConflictKvSchema() { var externalJsonSchema = new MockExternalJsonSchema<>(Schemas.PersonFour.class); try { Schema.KeyValue(externalJsonSchema, Schema.JSON(Schemas.PersonFour.class), KeyValueEncodingType.SEPARATED); @@ -169,8 +171,9 @@ public class ExternalSchemaTest extends MockedPulsarServiceBaseTest { for (int i = 0; i < messageCount; i++) { var message = consumer.receive(); assertTrue(message.getSchemaId().isPresent()); - assertEquals(message.getSchemaId().get(), KeyValue.generateKVSchemaId( - MockExternalJsonSchema.MOCK_KEY_SCHEMA_ID, MockExternalJsonSchema.MOCK_SCHEMA_ID)); + var kvSchemaId = KeyValue.generateKVSchemaId( + MockExternalJsonSchema.MOCK_KEY_SCHEMA_ID, MockExternalJsonSchema.MOCK_SCHEMA_ID); + assertEquals(message.getSchemaId().get(), SchemaIdUtil.addMagicHeader(kvSchemaId, true)); if (KeyValueEncodingType.INLINE.equals(encodingType)) { ByteBuf buf = Unpooled.wrappedBuffer(message.getData()); @@ -226,8 +229,8 @@ public class ExternalSchemaTest extends MockedPulsarServiceBaseTest { for (int i = 0; i < messageCount; i++) { var message = consumer.receive(); assertTrue(message.getSchemaId().isPresent()); - assertEquals(message.getSchemaId().get(), KeyValue.generateKVSchemaId( - new byte[0], MockExternalJsonSchema.MOCK_SCHEMA_ID)); + var kvSchemaId = KeyValue.generateKVSchemaId(new byte[0], MockExternalJsonSchema.MOCK_SCHEMA_ID); + assertEquals(message.getSchemaId().get(), SchemaIdUtil.addMagicHeader(kvSchemaId, true)); var keyBytes = ("index-" + i).getBytes(); if (KeyValueEncodingType.INLINE.equals(encodingType)) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index e4ed2a1303d..3089684a1f9 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -140,11 +140,11 @@ public interface Schema<T> extends Cloneable { } default T decode(String topic, ByteBuffer data, byte[] schemaId) { - return decode(data, schemaId); + throw new UnsupportedOperationException("Not implemented"); } default T decode(String topic, byte[] data, byte[] schemaId) { - return decode(data, schemaId); + throw new UnsupportedOperationException("Not implemented"); } /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaIdUtil.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaIdUtil.java new file mode 100644 index 00000000000..599d93dbe65 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaIdUtil.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.nio.ByteBuffer; + +/** + * SchemaIdUtil to fill and retrieve schema id in MessageMetadata. + */ +public class SchemaIdUtil { + + public static final byte MAGIC_BYTE_VALUE = -1; + public static final byte MAGIC_BYTE_KEY_VALUE = -2; + + public static byte[] addMagicHeader(byte[] data, boolean isKeyValue) { + if (data == null || data.length == 0) { + return null; + } + ByteBuffer buffer = ByteBuffer.allocate(1 + data.length); + buffer.put(isKeyValue ? MAGIC_BYTE_KEY_VALUE : MAGIC_BYTE_VALUE); + buffer.put(data); + return buffer.array(); + } + + public static byte[] removeMagicHeader(byte[] schemaId) { + if (schemaId == null || schemaId.length == 0) { + return null; + } + ByteBuffer buffer = ByteBuffer.wrap(schemaId); + byte magicByte = buffer.get(); + if (magicByte != MAGIC_BYTE_VALUE && magicByte != MAGIC_BYTE_KEY_VALUE) { + return schemaId; + } + byte[] id = new byte[buffer.remaining()]; + buffer.get(id); + return id; + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 2d27a2ed2c8..4d7b6cc4734 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.protocol.schema.SchemaHash; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaIdUtil; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -474,9 +475,10 @@ public class MessageImpl<T> implements Message<T> { public T getValue() { SchemaInfo schemaInfo = getSchemaInfo(); var schemaIdOp = getSchemaId(); + var schemaId = schemaIdOp.map(SchemaIdUtil::removeMagicHeader).orElse(null); if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) { if (schemaIdOp.isPresent()) { - return getKeyValueBySchemaId(schemaIdOp.get()); + return getKeyValueBySchemaId(schemaId); } if (schema.supportSchemaVersioning()) { return getKeyValueBySchemaVersion(); @@ -488,7 +490,7 @@ public class MessageImpl<T> implements Message<T> { return null; } if (schemaIdOp.isPresent()) { - return decodeBySchemaId(schemaIdOp.get()); + return decodeBySchemaId(schemaId); } // check if the schema passed in from client supports schema versioning or not // this is an optimization to only get schema version when necessary diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 35c72e2bc54..d7f85dad561 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -29,6 +29,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.client.api.EncodeData; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -40,6 +41,7 @@ import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaIdUtil; import org.apache.pulsar.common.schema.SchemaType; public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { @@ -72,7 +74,9 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { if (value == null) { msgMetadata.setNullValue(true); } else { + AtomicBoolean isKeyValueSchema = new AtomicBoolean(false); getKeyValueSchema().map(keyValueSchema -> { + isKeyValueSchema.set(true); if (keyValueSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { setSeparateKeyValue(value, keyValueSchema); return this; @@ -83,7 +87,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { EncodeData encodeData = schema.encode(getTopic(), value); content = ByteBuffer.wrap(encodeData.data()); if (encodeData.hasSchemaId()) { - msgMetadata.setSchemaId(encodeData.schemaId()); + msgMetadata.setSchemaId(SchemaIdUtil.addMagicHeader(encodeData.schemaId(), isKeyValueSchema.get())); } return this; }); @@ -334,7 +338,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { keyEncoded != null && keyEncoded.hasSchemaId() ? keyEncoded.schemaId() : null, valueEncoded != null && valueEncoded.hasSchemaId() ? valueEncoded.schemaId() : null); if (isValidSchemaId(schemaId)) { - msgMetadata.setSchemaId(schemaId); + msgMetadata.setSchemaId(SchemaIdUtil.addMagicHeader(schemaId, true)); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaIdUtilTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaIdUtilTest.java new file mode 100644 index 00000000000..d62357dc2d2 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaIdUtilTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import org.apache.pulsar.common.schema.SchemaIdUtil; +import org.testng.annotations.Test; + +public class SchemaIdUtilTest { + + private static final byte[] SCHEMA_ID = new byte[]{1, 2, 3}; + private static final byte[] SCHEMA_ID_WITH_VALUE_MAGIC_BYTE = new byte[]{-1, 1, 2, 3}; + private static final byte[] SCHEMA_ID_WITH_KEY_VALUE_MAGIC_BYTE = new byte[]{-2, 1, 2, 3}; + + @Test + public void testAddMagicHeader() { + assertNull(SchemaIdUtil.addMagicHeader(null, false)); + assertNull(SchemaIdUtil.addMagicHeader(new byte[0], false)); + assertEquals(SchemaIdUtil.addMagicHeader(SCHEMA_ID, false), SCHEMA_ID_WITH_VALUE_MAGIC_BYTE); + assertEquals(SchemaIdUtil.addMagicHeader(SCHEMA_ID, true), SCHEMA_ID_WITH_KEY_VALUE_MAGIC_BYTE); + } + + @Test + public void testRemoveMagicHeader() { + assertNull(SchemaIdUtil.removeMagicHeader(null)); + assertNull(SchemaIdUtil.removeMagicHeader(new byte[0])); + assertEquals(SchemaIdUtil.removeMagicHeader(SCHEMA_ID), SCHEMA_ID); + assertEquals(SchemaIdUtil.removeMagicHeader(SCHEMA_ID_WITH_VALUE_MAGIC_BYTE), SCHEMA_ID); + assertEquals(SchemaIdUtil.removeMagicHeader(SCHEMA_ID_WITH_KEY_VALUE_MAGIC_BYTE), SCHEMA_ID); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java index e92d1c09124..f64e7a91624 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java @@ -96,7 +96,7 @@ public class SupportVersioningKeyValueSchemaTest { byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar)); KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchemaImpl) keyValueSchema).decode( - null, fooSchema.encode(foo), encodeBytes, new byte[10]); + fooSchema.encode(foo), encodeBytes, new byte[10]); Assert.assertTrue(keyValue.getValue().isField1()); Assert.assertEquals( KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")), @@ -157,7 +157,7 @@ public class SupportVersioningKeyValueSchemaTest { byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar)); KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchemaImpl) keyValueSchema).decode( - null, fooSchema.encode(foo), encodeBytes, new byte[10]); + fooSchema.encode(foo), encodeBytes, new byte[10]); Assert.assertTrue(keyValue.getValue().isField1()); Assert.assertEquals( KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
