This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c0937b  Allow to configure TypedMessageBuilder through a Map conf 
object (#4015)
9c0937b is described below

commit 9c0937b85da38d25d6b0dbbcc2a58b0178dbf09f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Apr 10 22:17:16 2019 -0700

    Allow to configure TypedMessageBuilder through a Map conf object (#4015)
    
    * Allow to configure TypedMessageBuilder through a Map conf object
    
    * Use constants for message confs
    
    * Reverted previous change
    
    * Use Long instead of Number
---
 .../api/SimpleTypedProducerConsumerTest.java       | 80 ++++++++++++++++++++++
 .../pulsar/client/api/TypedMessageBuilder.java     | 42 ++++++++++++
 .../client/impl/TypedMessageBuilderImpl.java       | 27 ++++++++
 .../apache/pulsar/client/util/TypeCheckUtil.java   | 33 +++++++++
 4 files changed, 182 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index 050db39..586f72b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -18,15 +18,23 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import java.time.Clock;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -623,4 +631,76 @@ public class SimpleTypedProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
 
     }
+
+    @Test
+    public void testMessageBuilderLoadConf() throws Exception {
+        String topic = "my-topic-" + System.nanoTime();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-subscriber-name")
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("a", "1");
+        properties.put("b", "2");
+
+        Map<String, Object> msgConf = new HashMap<>();
+        msgConf.put("key", "key-1");
+        msgConf.put("properties", properties);
+        msgConf.put("eventTime", 1234L);
+        msgConf.put("sequenceId", 5L);
+        msgConf.put("replicationClusters", Lists.newArrayList("a", "b", "c"));
+        msgConf.put("disableReplication", false);
+
+        producer.newMessage()
+            .value("my-message")
+            .loadConf(msgConf)
+            .send();
+
+
+        Message<String> msg = consumer.receive();
+        assertEquals(msg.getKey(), "key-1");
+        assertEquals(msg.getProperties().get("a"), "1");
+        assertEquals(msg.getProperties().get("b"), "2");
+        assertEquals(msg.getEventTime(), 1234);
+        assertEquals(msg.getSequenceId(), 5);
+
+        consumer.acknowledge(msg);
+
+        // Try with invalid confs
+        msgConf.clear();
+        msgConf.put("nonExistingKey", "key-1");
+
+        try {
+            producer.newMessage()
+                    .value("my-message")
+                    .loadConf(msgConf)
+                    .send();
+            fail("Should have failed");
+        } catch (RuntimeException e) {
+            // expected
+        }
+
+        // Try with invalid type
+        msgConf.clear();
+        msgConf.put("eventTime", "hello");
+
+        try {
+            producer.newMessage()
+                    .value("my-message")
+                    .loadConf(msgConf)
+                    .send();
+            fail("Should have failed");
+        } catch (RuntimeException e) {
+            // expected
+        }
+    }
+
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
index 423c080..a1e2f2d 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java
@@ -174,4 +174,46 @@ public interface TypedMessageBuilder<T> extends 
Serializable {
      * @return the message builder instance
      */
     TypedMessageBuilder<T> disableReplication();
+
+    /**
+     * Configure the {@link TypedMessageBuilder} from a config map, as an 
alternative compared
+     * to call the individual builder methods.
+     * <p>
+     * The "value" of the message itself cannot be set on the config map.
+     * <p>
+     * Example:
+     *
+     * <pre>{@code
+     * Map<String, Object> conf = new HashMap<>();
+     * conf.put("key", "my-key");
+     * conf.put("eventTime", System.currentTimeMillis());
+     *
+     * producer.newMessage()
+     *             .value("my-message")
+     *             .loadConf(conf)
+     *             .send();
+     * }</pre>
+     *
+     * The available options are:
+     * <table border="1">
+     *  <tr><th>Constant</th><th>Name</th><th>Type</th><th>Doc</th></tr>
+     *  <tr><td>{@link #CONF_KEY}</td><td>{@code key}</td><td>{@code 
String}</td><td>{@link #key(String)}</td></tr>
+     *  <tr><td>{@link #CONF_PROPERTIES}</td><td>{@code 
properties}</td><td>{@code Map<String,String>}</td><td>{@link 
#properties(Map)}</td></tr>
+     *  <tr><td>{@link #CONF_EVENT_TIME}</td><td>{@code 
eventTime}</td><td>{@code long}</td><td>{@link #eventTime(long)}</td></tr>
+     *  <tr><td>{@link #CONF_SEQUENCE_ID}</td><td>{@code 
sequenceId}</td><td>{@code long}</td><td>{@link #sequenceId(long)}</td></tr>
+     *  <tr><td>{@link #CONF_REPLICATION_CLUSTERS}</td><td>{@code 
replicationClusters}</td><td>{@code List<String>}</td><td>{@link 
#replicationClusters(List)}</td></tr>
+     *  <tr><td>{@link #CONF_DISABLE_REPLICATION}</td><td>{@code 
disableReplication}</td><td>{@code boolean}</td><td>{@link 
#disableReplication()}</td></tr>
+     * </table>
+     *
+     * @param config a map with the configuration options for the message
+     * @return the message builder instance
+     */
+    TypedMessageBuilder<T> loadConf(Map<String, Object> config);
+
+    static final String CONF_KEY = "key";
+    static final String CONF_PROPERTIES = "properties";
+    static final String CONF_EVENT_TIME = "eventTime";
+    static final String CONF_SEQUENCE_ID = "sequenceId";
+    static final String CONF_REPLICATION_CLUSTERS = "replicationClusters";
+    static final String CONF_DISABLE_REPLICATION = "disableReplication";
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index b13423e..fb18c0e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.util.TypeCheckUtil.checkType;
 
 import com.google.common.base.Preconditions;
 
@@ -130,6 +131,32 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
         return this;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public TypedMessageBuilder<T> loadConf(Map<String, Object> config) {
+        config.forEach((key, value) -> {
+            if (key.equals(CONF_KEY)) {
+                this.key(checkType(value, String.class));
+            } else if (key.equals(CONF_PROPERTIES)) {
+                this.properties(checkType(value, Map.class));
+            } else if (key.equals(CONF_EVENT_TIME)) {
+                this.eventTime(checkType(value, Long.class));
+            } else if (key.equals(CONF_SEQUENCE_ID)) {
+                this.sequenceId(checkType(value, Long.class));
+            } else if (key.equals(CONF_REPLICATION_CLUSTERS)) {
+                this.replicationClusters(checkType(value, List.class));
+            } else if (key.equals(CONF_DISABLE_REPLICATION)) {
+                boolean disableReplication = checkType(value, Boolean.class);
+                if (disableReplication) {
+                    this.disableReplication();
+                }
+            } else {
+                throw new RuntimeException("Invalid message config key '" + 
key + "'");
+            }
+        });
+        return this;
+    }
+
     public MessageMetadata.Builder getMetadataBuilder() {
         return msgMetadataBuilder;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java
new file mode 100644
index 0000000..cbabdfe
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class TypeCheckUtil {
+    @SuppressWarnings("unchecked")
+    public static <T> T checkType(Object o, Class<T> clazz) {
+        if (!clazz.isInstance(o)) {
+            throw new RuntimeException(
+                    String.format("Invalid object type '%s' when exepcting 
'%s'", o.getClass().getName(), clazz.getName()));
+        }
+        return (T) o;
+    }
+}

Reply via email to