This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 399a0b10f4a [refactor][java] Improve docs and code quality about
KeyValueSchema usages (#17256)
399a0b10f4a is described below
commit 399a0b10f4a9854cb1af2875b74a5ce6726e0fb8
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 16 09:31:36 2022 +0800
[refactor][java] Improve docs and code quality about KeyValueSchema usages
(#17256)
(cherry picked from commit 5d6a88efa78969073c9dad015b4727b59e8a76d8)
---
.../java/org/apache/pulsar/client/api/Schema.java | 6 +-
.../PulsarClientImplementationBinding.java | 2 -
.../PulsarClientImplementationBindingImpl.java | 4 -
.../client/impl/TypedMessageBuilderImpl.java | 98 +++++++++++++---------
4 files changed, 62 insertions(+), 48 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 6803187521a..d23daa5defa 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -367,10 +367,12 @@ public interface Schema<T> extends Cloneable{
}
/**
- * Key Value Schema using passed in key and value schemas.
+ * Key Value Schema using passed in key and value schemas with {@link
KeyValueEncodingType#INLINE} encoding type.
+ *
+ * @see Schema#KeyValue(Schema, Schema, KeyValueEncodingType)
*/
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V>
value) {
- return
DefaultImplementation.getDefaultImplementation().newKeyValueSchema(key, value);
+ return KeyValue(key, value, KeyValueEncodingType.INLINE);
}
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 75cd7dc1fee..acafec37a40 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -131,8 +131,6 @@ public interface PulsarClientImplementationBinding {
Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema();
- <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema,
Schema<V> valueSchema);
-
<K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema,
Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 2747d39a735..da68d503789 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -213,10 +213,6 @@ public final class PulsarClientImplementationBindingImpl
implements PulsarClient
return KeyValueSchemaImpl.kvBytes();
}
- public <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K>
keySchema, Schema<V> valueSchema) {
- return KeyValueSchemaImpl.of(keySchema, valueSchema);
- }
-
public <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K>
keySchema, Schema<V> valueSchema,
KeyValueEncodingType
keyValueEncodingType) {
return KeyValueSchemaImpl.of(keySchema, valueSchema,
keyValueEncodingType);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 1bf4cd2eb1b..acc60cfcfe7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -35,7 +36,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -107,14 +108,12 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
@Override
public TypedMessageBuilder<T> key(String key) {
- if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
- KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
- checkArgument(!(kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED),
- "This method is not allowed to set keys when in encoding
type is SEPARATED");
- if (key == null) {
- msgMetadata.setNullPartitionKey(true);
- return this;
- }
+ getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
+ keyValueSchema.getKeyValueEncodingType() !=
KeyValueEncodingType.SEPARATED,
+ "This method is not allowed to set keys when in encoding type
is SEPARATED"));
+ if (key == null) {
+ msgMetadata.setNullPartitionKey(true);
+ return this;
}
msgMetadata.setPartitionKey(key);
msgMetadata.setPartitionKeyB64Encoded(false);
@@ -123,14 +122,12 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
- if (schema instanceof KeyValueSchemaImpl &&
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
- KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
- checkArgument(!(kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED),
- "This method is not allowed to set keys when in encoding
type is SEPARATED");
- if (key == null) {
- msgMetadata.setNullPartitionKey(true);
- return this;
- }
+ getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
+ keyValueSchema.getKeyValueEncodingType() !=
KeyValueEncodingType.SEPARATED,
+ "This method is not allowed to set keys when in encoding type
is SEPARATED"));
+ if (key == null) {
+ msgMetadata.setNullPartitionKey(true);
+ return this;
}
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadata.setPartitionKeyB64Encoded(true);
@@ -149,31 +146,18 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
msgMetadata.setNullValue(true);
return this;
}
- if (value instanceof org.apache.pulsar.common.schema.KeyValue
- && schema.getSchemaInfo() != null &&
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
- KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
- org.apache.pulsar.common.schema.KeyValue kv =
(org.apache.pulsar.common.schema.KeyValue) value;
- if (kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
- // set key as the message key
- if (kv.getKey() != null) {
- msgMetadata.setPartitionKey(
-
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
- msgMetadata.setPartitionKeyB64Encoded(true);
- } else {
- this.msgMetadata.setNullPartitionKey(true);
- }
-
- // set value as the payload
- if (kv.getValue() != null) {
- this.content =
ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
- } else {
- this.msgMetadata.setNullValue(true);
- }
+
+ return getKeyValueSchema().map(keyValueSchema -> {
+ if (keyValueSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
+ setSeparateKeyValue(value, keyValueSchema);
return this;
+ } else {
+ return null;
}
- }
- this.content = ByteBuffer.wrap(schema.encode(value));
- return this;
+ }).orElseGet(() -> {
+ content = ByteBuffer.wrap(schema.encode(value));
+ return this;
+ });
}
@Override
@@ -302,4 +286,38 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
public ByteBuffer getContent() {
return content;
}
+
+ private Optional<KeyValueSchema<?, ?>> getKeyValueSchema() {
+ if (schema.getSchemaInfo() != null
+ && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE
+ // The schema's class could also be AutoProduceBytesSchema
when its type is KEY_VALUE
+ && schema instanceof KeyValueSchema) {
+ return Optional.of((KeyValueSchema<?, ?>) schema);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <K, V> void setSeparateKeyValue(T value, KeyValueSchema<K, V>
keyValueSchema) {
+ checkArgument(value instanceof
org.apache.pulsar.common.schema.KeyValue);
+ org.apache.pulsar.common.schema.KeyValue<K, V> keyValue =
+ (org.apache.pulsar.common.schema.KeyValue<K, V>) value;
+
+ // set key as the message key
+ if (keyValue.getKey() != null) {
+ msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(
+ keyValueSchema.getKeySchema().encode(keyValue.getKey())));
+ msgMetadata.setPartitionKeyB64Encoded(true);
+ } else {
+ msgMetadata.setNullPartitionKey(true);
+ }
+
+ // set value as the payload
+ if (keyValue.getValue() != null) {
+ content =
ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue()));
+ } else {
+ msgMetadata.setNullValue(true);
+ }
+ }
}