This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 17ab040 [tools] Pulsar Client: add ability to produce KV messages (#11303) 17ab040 is described below commit 17ab040bbd71161df3860613ddcd253b3af24f59 Author: Enrico Olivelli <eolive...@gmail.com> AuthorDate: Sat Aug 14 01:24:45 2021 +0200 [tools] Pulsar Client: add ability to produce KV messages (#11303) ### Motivation Currently (Pulsar 2.8.0) it is not easy to produce messages with KeyValue encoding, because command line tools do not provide such support. With this change the user will be able to set the schema while using `pulsar-client produce` We are adding three parameters: * "--key-schema" : this is the schema for the Key (default :"string") * "--value-schema": this is the schema for the Value (default: "bytes") * "--key-value-encoding-type": this is the type of encoding with values: none,separated,inline with key-value-encoding-type=node (default behaviour) we are not using KV encoding The command is 100% compatible with previous versions ### Modifications Add support for the properties listed above. We are using "Schema.AUTO_PRODUCE_BYTES" in order to deal with the Schema registry. The user will pass the raw value as message and we are passing it without modifications to Pulsar. Example command to send a KV message with JSON key and value: `bin/pulsar-client produce --key-value-encoding-type separated -k '{"a":"b"}' -m '{"a":"b"}' --key-schema 'json:{"type": "record","namespace": "com.example","name": "FullName", "fields": [{ "name": "a", "type": "string" }]} ' --value-schema 'json:{"type": "record","namespace": "com.example","name": "FullName", "fields": [{ "name": "a", "type": "string" }]} ' test` for AVRO and JSON the schema is written inline after the prefix "avro:" and "json:" ### Verifying this change This change added unit tests --- .../org/apache/pulsar/client/cli/CmdProduce.java | 106 +++++++++++++++++++-- .../apache/pulsar/client/cli/TestCmdProduce.java | 37 ++++++- 2 files changed, 133 insertions(+), 10 deletions(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java index 326e221..52d0b25 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.RateLimiter; import com.google.gson.JsonParseException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -38,12 +39,12 @@ import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.ClientBuilder; @@ -51,8 +52,13 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerMessage; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -76,6 +82,9 @@ public class CmdProduce { private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class); private static final int MAX_MESSAGES = 1000; + static final String KEY_VALUE_ENCODING_TYPE_NOT_SET = ""; + private static final String KEY_VALUE_ENCODING_TYPE_SEPARATED = "separated"; + private static final String KEY_VALUE_ENCODING_TYPE_INLINE = "inline"; @Parameter(description = "TopicName", required = true) private List<String> mainOptions; @@ -114,6 +123,15 @@ public class CmdProduce { @Parameter(names = { "-k", "--key"}, description = "message key to add ") private String key; + @Parameter(names = { "-vs", "--value-schema"}, description = "Schema type (can be bytes,avro,json,string...)") + private String valueSchema = "bytes"; + + @Parameter(names = { "-ks", "--key-schema"}, description = "Schema type (can be bytes,avro,json,string...)") + private String keySchema = "string"; + + @Parameter(names = { "-kvet", "--key-value-encoding-type"}, description = "Key Value Encoding Type (it can be separated or inline)") + private String keyValueEncodingType = null; + @Parameter(names = { "-ekn", "--encryption-key-name" }, description = "The public key name to encrypt payload") private String encKeyName = null; @@ -190,6 +208,18 @@ public class CmdProduce { throw (new ParameterException("Please supply message content with either --messages or --files")); } + if (keyValueEncodingType == null) { + keyValueEncodingType = KEY_VALUE_ENCODING_TYPE_NOT_SET; + } else { + switch (keyValueEncodingType) { + case KEY_VALUE_ENCODING_TYPE_SEPARATED: + case KEY_VALUE_ENCODING_TYPE_INLINE: + break; + default: + throw (new ParameterException("--key-value-encoding-type "+keyValueEncodingType+" is not valid, only 'separated' or 'inline'")); + } + } + int totalMessages = (messages.size() + messageFileNames.size()) * numTimesProduce; if (totalMessages > MAX_MESSAGES) { String msg = "Attempting to send " + totalMessages + " messages. Please do not send more than " @@ -212,7 +242,8 @@ public class CmdProduce { try { PulsarClient client = clientBuilder.build(); - ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topic); + Schema<?> schema = buildSchema(this.keySchema, this.valueSchema, this.keyValueEncodingType); + ProducerBuilder<?> producerBuilder = client.newProducer(schema).topic(topic); if (this.chunkingAllowed) { producerBuilder.enableChunking(true); producerBuilder.enableBatching(false); @@ -221,7 +252,7 @@ public class CmdProduce { producerBuilder.addEncryptionKey(this.encKeyName); producerBuilder.defaultCryptoKeyReader(this.encKeyValue); } - Producer<byte[]> producer = producerBuilder.create(); + Producer<?> producer = producerBuilder.create(); List<byte[]> messageBodies = generateMessageBodies(this.messages, this.messageFileNames); RateLimiter limiter = (this.publishRate > 0) ? RateLimiter.create(this.publishRate) : null; @@ -238,17 +269,33 @@ public class CmdProduce { limiter.acquire(); } - TypedMessageBuilder<byte[]> message = producer.newMessage(); + TypedMessageBuilder message = producer.newMessage(); if (!kvMap.isEmpty()) { message.properties(kvMap); } - if (key != null && !key.isEmpty()) { - message.key(key); + switch (keyValueEncodingType) { + case KEY_VALUE_ENCODING_TYPE_NOT_SET: + if (key != null && !key.isEmpty()) { + message.key(key); + } + message.value(content); + break; + case KEY_VALUE_ENCODING_TYPE_SEPARATED: + case KEY_VALUE_ENCODING_TYPE_INLINE: + KeyValue kv = new KeyValue<>( + // TODO: support AVRO encoded key + key != null ? key.getBytes(StandardCharsets.UTF_8) : null, + content); + message.value(kv); + break; + default: + throw new IllegalStateException(); } - message.value(content).send(); + message.send(); + numMessagesSent++; } @@ -265,6 +312,51 @@ public class CmdProduce { return returnCode; } + static Schema<?> buildSchema(String keySchema, String schema, String keyValueEncodingType) { + switch (keyValueEncodingType) { + case KEY_VALUE_ENCODING_TYPE_NOT_SET: + return buildComponentSchema(schema); + case KEY_VALUE_ENCODING_TYPE_SEPARATED: + return Schema.KeyValue(buildComponentSchema(keySchema), buildComponentSchema(schema), KeyValueEncodingType.SEPARATED); + case KEY_VALUE_ENCODING_TYPE_INLINE: + return Schema.KeyValue(buildComponentSchema(keySchema), buildComponentSchema(schema), KeyValueEncodingType.INLINE); + default: + throw new IllegalArgumentException("Invalid KeyValueEncodingType "+keyValueEncodingType+", only: 'none','separated' and 'inline"); + } + } + + private static Schema<?> buildComponentSchema(String schema) { + Schema<?> base; + switch (schema) { + case "string": + base = Schema.STRING; + break; + case "bytes": + // no need for wrappers + return Schema.BYTES; + default: + if (schema.startsWith("avro:")) { + base = buildGenericSchema(SchemaType.AVRO, schema.substring(5)); + } else if (schema.startsWith("json:")) { + base = buildGenericSchema(SchemaType.JSON, schema.substring(5)); + } else { + throw new IllegalArgumentException("Invalid schema type: "+schema); + } + } + return Schema.AUTO_PRODUCE_BYTES(base); + } + + private static Schema<?> buildGenericSchema(SchemaType type, String definition) { + return Schema.generic(SchemaInfoImpl + .builder() + .schema(definition.getBytes(StandardCharsets.UTF_8)) + .name("client") + .properties(new HashMap<>()) + .type(type) + .build()); + + } + @SuppressWarnings("deprecation") @VisibleForTesting public String getProduceBaseEndPoint(String topic) { diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java index 173fcfc..17e1e2d 100644 --- a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java +++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java @@ -18,10 +18,16 @@ */ package org.apache.pulsar.client.cli; -import org.testng.Assert; + +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.client.api.schema.KeyValueSchema; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + public class TestCmdProduce { CmdProduce cmdProduce; @@ -35,10 +41,35 @@ public class TestCmdProduce { @Test public void testGetProduceBaseEndPoint() { String topicNameV1 = "persistent://public/cluster/default/issue-11067"; - Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1), + assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1), "ws://localhost:8080/ws/producer/persistent/public/cluster/default/issue-11067"); String topicNameV2 = "persistent://public/default/issue-11067"; - Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2), + assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2), "ws://localhost:8080/ws/v2/producer/persistent/public/default/issue-11067"); } + + @Test + public void testBuildSchema() { + // default + assertEquals(SchemaType.BYTES, CmdProduce.buildSchema("string", "bytes", CmdProduce.KEY_VALUE_ENCODING_TYPE_NOT_SET).getSchemaInfo().getType()); + + // simple key value + assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", "string", "separated").getSchemaInfo().getType()); + assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string", "string", "inline").getSchemaInfo().getType()); + + KeyValueSchema<?, ?> composite1 = (KeyValueSchema<?, ?>) CmdProduce.buildSchema("string", + "json:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}", + "inline"); + assertEquals(KeyValueEncodingType.INLINE, composite1.getKeyValueEncodingType()); + assertEquals(SchemaType.STRING, composite1.getKeySchema().getSchemaInfo().getType()); + assertEquals(SchemaType.JSON, composite1.getValueSchema().getSchemaInfo().getType()); + + KeyValueSchema<?, ?> composite2 = (KeyValueSchema<?, ?>) CmdProduce.buildSchema( + "json:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}", + "avro:{\"type\": \"record\",\"namespace\": \"com.example\",\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\", \"type\": \"string\" }]}", + "inline"); + assertEquals(KeyValueEncodingType.INLINE, composite2.getKeyValueEncodingType()); + assertEquals(SchemaType.JSON, composite2.getKeySchema().getSchemaInfo().getType()); + assertEquals(SchemaType.AVRO, composite2.getValueSchema().getSchemaInfo().getType()); + } } \ No newline at end of file