[ 
https://issues.apache.org/jira/browse/BEAM-13608?focusedWorklogId=762061&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762061
 ]

ASF GitHub Bot logged work on BEAM-13608:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Apr/22 22:56
            Start Date: 25/Apr/22 22:56
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on code in PR #17163:
URL: https://github.com/apache/beam/pull/17163#discussion_r858075089


##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of 
javax.jms.Message

Review Comment:
   ```suggestion
   * JmsIO gains the ability to map any kind of input to any subclass of 
`javax.jms.Message` 
([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
   ```



##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of 
javax.jms.Message
+  * To have the same behavior as before, a valueMapper with a 
TextMessageMapper must be set at JmsIO.Write initialization like this:
+```java
+  JmsIO.<String>write()
+        .withConnectionFactory(jmsConnectionFactory)
+        .withCoder(SerializableCoder.of(String.class))
+        .withValueMapper(new TextMessageMapper());
+```
+* JMSIO introduces the ability to write to dynamic topics.
+  * A topicNameMapper must be set to extract the topic name from the input,
+  * A valueMapper must be set to extract the value from the input.
+
+1. For instance let's create a topicNameMapper. Employee Event is a Map.
+
+```java
+        String topic = "company/%s/employee/%s";
+        // Get a topic name formatted like this:
+        // company/%s/employee/%s
+        // Vehicle name is a map containing the vehicle brand and the VIN
+        SerializableFunction<CompanyEvent, String> topicNameMapper =
+        (e ->
+        String.format(
+        topic,
+        e.get(COMPANY_NAME.fieldName()).toString().toLowerCase(),
+        e.get(EMPLOYEE.fieldName()).toString().toLowerCase()));
+}
+```
+
+2. Let's create a valueMapper that will return a javax.jms.Message:
+
+```java
+      SerializableBiFunction<EmployeeEvent, Session, Message> valueMapper = 
(e, s) -> {
+        try {
+          TextMessage msg = s.createTextMessage();
+          msg.setText(Mapper.MAPPER.toJson(e));
+          return msg;
+        } catch (JMSException ex) {
+          throw new JmsIOException("Error!!", ex);
+        }
+    };
+```
+
+3. We now have to inject those two functions into the JmsIO.Write class:
+
+```java
+  JmsIO.<EmployeeEvent>write()
+        .withConnectionFactory(jmsConnectionFactory)
+        .withTopicNameMapper(topicNameMapper)
+        .withCoder(SerializableCoder.of(EmployeeEvent.class))
+        .withValueMapper(valueMapper);
+
+```

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +709,128 @@ public Write withQueue(String queue) {
      * @param topic The JMS topic name.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withTopic(String topic) {
+    public Write<EventT> withTopic(String topic) {
       checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
     /** Define the username to connect to the JMS broker (authenticated). */
-    public Write withUsername(String username) {
+    public Write<EventT> withUsername(String username) {
       checkArgument(username != null, "username can not be null");
       return builder().setUsername(username).build();
     }
 
     /** Define the password to connect to the JMS broker (authenticated). */
-    public Write withPassword(String password) {
+    public Write<EventT> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
+    /**
+     * Specify the JMS topic destination name where to send messages to 
dynamically. The {@link
+     * JmsIO.Write} acts as a publisher on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String) 
and
+     * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link 
SerializableFunction}
+     * that takes {@code EventT} object as a parameter, and returns the topic 
name depending of the content
+     * of the event object.
+     *
+     * <p>For instance:
+     * <pre>{@code
+     * SerializableFunction<SomeEventObject, String> topicNameMapper = (e -> 
return "the topic name");
+     *
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+     * }</pre>
+     *
+     * @param topicNameMapper The function returning the dynamic topic name.
+     * @return The corresponding {@link JmsIO.Write}.
+     */
+    public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, 
String> topicNameMapper) {
+      checkArgument(topicNameMapper != null, "topicNameMapper can not be 
null");
+      return builder().setTopicNameMapper(topicNameMapper).build();
+    }
+
+    /**
+     * Map the {@code EventT} object to a {@link javax.jms.Message}.
+     *
+     * <p>For instance:
+     *
+     * <pre>{@code
+     * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper = 
(e, s) -> {
+     *
+     *       try {
+     *         TextMessage msg = s.createTextMessage();
+     *         msg.setText(Mapper.MAPPER.toJson(e));
+     *         return msg;
+     *       } catch (JMSException ex) {
+     *         throw new JmsIOException("Error!!", ex);
+     *       }
+     *     };
+     *
+     * }</pre>

Review Comment:
   ```suggestion
        * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper 
= (event, session) -> {
        *
        *       try {
        *         TextMessage msg = session.createTextMessage();
        *         msg.setText(event.toJson());
        *         return msg;
        *       } catch (JMSException ex) {
        *         throw new JmsIOException("Error!", ex);
        *       }
        *     };
        * }</pre>
   ```



##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of 
javax.jms.Message
+  * To have the same behavior as before, a valueMapper with a 
TextMessageMapper must be set at JmsIO.Write initialization like this:
+```java
+  JmsIO.<String>write()
+        .withConnectionFactory(jmsConnectionFactory)
+        .withCoder(SerializableCoder.of(String.class))

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
     assertEquals(100, count);
   }
 
+  @Test
+  public void testWriteMessageWithError() throws Exception {
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+    String messageOne = "Message 1";
+    String messageTwo = "Message 2";
+
+    WriteJmsResult<String> output =
+        pipeline
+            .apply(Create.of(data))
+            .apply(
+                JmsIO.<String>write()
+                    .withConnectionFactory(connectionFactory)
+                    .withValueMapper(new TextMessageMapperWithError())
+                    .withQueue(QUEUE)
+                    .withUsername(USERNAME)
+                    .withPassword(PASSWORD));
+
+    PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne, 
messageTwo);
+
+    pipeline.run();
+
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE));
+    int count = 0;
+    while (consumer.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(98, count);
+    assertTrue(output.getPipeline().equals(pipeline));
+  }
+
+  @Test
+  public void testWriteDynamicMessage() throws Exception {
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumerOne = 
session.createConsumer(session.createTopic("Topic_One"));
+    MessageConsumer consumerTwo = 
session.createConsumer(session.createTopic("Topic_Two"));
+    ArrayList<TestEvent> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {

Review Comment:
   ```suggestion
       for (int i = 0; i < 50; i++) {
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
     assertEquals(100, count);
   }
 
+  @Test
+  public void testWriteMessageWithError() throws Exception {
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+    String messageOne = "Message 1";
+    String messageTwo = "Message 2";
+
+    WriteJmsResult<String> output =
+        pipeline
+            .apply(Create.of(data))
+            .apply(
+                JmsIO.<String>write()
+                    .withConnectionFactory(connectionFactory)
+                    .withValueMapper(new TextMessageMapperWithError())
+                    .withQueue(QUEUE)
+                    .withUsername(USERNAME)
+                    .withPassword(PASSWORD));
+
+    PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne, 
messageTwo);
+
+    pipeline.run();
+
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE));
+    int count = 0;
+    while (consumer.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(98, count);
+    assertTrue(output.getPipeline().equals(pipeline));

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,4 +643,39 @@ private <T, MethodArgT, MethodResultT> T proxyMethod(
               return result;
             });
   }
+
+  private static class TestEvent implements Serializable {
+    private final String topicName;
+    private final String value;
+
+    private TestEvent(String topicName, String value) {
+      this.topicName = topicName;
+      this.value = value;
+    }
+
+    private String getTopicName() {
+      return this.topicName;
+    }
+
+    private String getValue() {
+      return this.value;
+    }
+  }
+
+  private static class TextMessageMapperWithError
+      implements SerializableBiFunction<String, Session, Message> {
+    @Override
+    public Message apply(String value, Session session) {
+      try {
+        if (value.equals("Message " + 1) || value.equals("Message " + 2)) {

Review Comment:
   ```suggestion
           if (value.equals("Message 1") || value.equals("Message 2")) {
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -656,7 +675,7 @@ public Write withConnectionFactory(ConnectionFactory 
connectionFactory) {
      * acts as a producer on the queue.
      *
      * <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}. 
The user has to
-     * specify a destination: queue or topic.
+     * specify a destination: queue or topic or topicNameMapper.

Review Comment:
   ```suggestion
        * specify a destination: queue, topic, or topicNameMapper.
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -678,7 +697,7 @@ public Write withQueue(String queue) {
      * as a publisher on the topic.
      *
      * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}. 
The user has to
-     * specify a destination: queue or topic.
+     * specify a destination: queue or topic or topicNameMapper.

Review Comment:
   ```suggestion
        * specify a destination: queue, topic, or topicNameMapper.
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +709,128 @@ public Write withQueue(String queue) {
      * @param topic The JMS topic name.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withTopic(String topic) {
+    public Write<EventT> withTopic(String topic) {
       checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
     /** Define the username to connect to the JMS broker (authenticated). */
-    public Write withUsername(String username) {
+    public Write<EventT> withUsername(String username) {
       checkArgument(username != null, "username can not be null");
       return builder().setUsername(username).build();
     }
 
     /** Define the password to connect to the JMS broker (authenticated). */
-    public Write withPassword(String password) {
+    public Write<EventT> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
+    /**
+     * Specify the JMS topic destination name where to send messages to 
dynamically. The {@link
+     * JmsIO.Write} acts as a publisher on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String) 
and
+     * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link 
SerializableFunction}
+     * that takes {@code EventT} object as a parameter, and returns the topic 
name depending of the content
+     * of the event object.
+     *
+     * <p>For instance:
+     * <pre>{@code
+     * SerializableFunction<SomeEventObject, String> topicNameMapper = (e -> 
return "the topic name");
+     *
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+     * }</pre>
+     *
+     * @param topicNameMapper The function returning the dynamic topic name.
+     * @return The corresponding {@link JmsIO.Write}.
+     */
+    public Write<EventT> withTopicNameMapper(SerializableFunction<EventT, 
String> topicNameMapper) {
+      checkArgument(topicNameMapper != null, "topicNameMapper can not be 
null");
+      return builder().setTopicNameMapper(topicNameMapper).build();
+    }
+
+    /**
+     * Map the {@code EventT} object to a {@link javax.jms.Message}.
+     *
+     * <p>For instance:
+     *
+     * <pre>{@code
+     * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper = 
(e, s) -> {
+     *
+     *       try {
+     *         TextMessage msg = s.createTextMessage();
+     *         msg.setText(Mapper.MAPPER.toJson(e));
+     *         return msg;
+     *       } catch (JMSException ex) {
+     *         throw new JmsIOException("Error!!", ex);
+     *       }
+     *     };
+     *
+     * }</pre>
+     *
+     * <pre>{@code
+     * .apply(JmsIO.write().withValueMapper(valueNapper)
+     *

Review Comment:
   ```suggestion
   ```



##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
 
 * ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 -->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of 
javax.jms.Message
+  * To have the same behavior as before, a valueMapper with a 
TextMessageMapper must be set at JmsIO.Write initialization like this:
+```java
+  JmsIO.<String>write()
+        .withConnectionFactory(jmsConnectionFactory)
+        .withCoder(SerializableCoder.of(String.class))
+        .withValueMapper(new TextMessageMapper());
+```
+* JMSIO introduces the ability to write to dynamic topics.
+  * A topicNameMapper must be set to extract the topic name from the input,
+  * A valueMapper must be set to extract the value from the input.

Review Comment:
   ```suggestion
   * JmsIO introduces the ability to write to dynamic topics (Java) 
([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
     * A `topicNameMapper` must be set to extract the topic name from the input 
value.
     * A `valueMapper` must be set to convert the input value to JMS message.
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
     assertEquals(100, count);
   }
 
+  @Test
+  public void testWriteMessageWithError() throws Exception {
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+    String messageOne = "Message 1";
+    String messageTwo = "Message 2";

Review Comment:
   ```suggestion
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
     assertEquals(100, count);
   }
 
+  @Test
+  public void testWriteMessageWithError() throws Exception {
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+    String messageOne = "Message 1";
+    String messageTwo = "Message 2";
+
+    WriteJmsResult<String> output =
+        pipeline
+            .apply(Create.of(data))
+            .apply(
+                JmsIO.<String>write()
+                    .withConnectionFactory(connectionFactory)
+                    .withValueMapper(new TextMessageMapperWithError())
+                    .withQueue(QUEUE)
+                    .withUsername(USERNAME)
+                    .withPassword(PASSWORD));
+
+    PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne, 
messageTwo);
+
+    pipeline.run();
+
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE));
+    int count = 0;
+    while (consumer.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(98, count);
+    assertTrue(output.getPipeline().equals(pipeline));
+  }
+
+  @Test
+  public void testWriteDynamicMessage() throws Exception {
+    Connection connection = connectionFactory.createConnection(USERNAME, 
PASSWORD);
+    connection.start();
+    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    MessageConsumer consumerOne = 
session.createConsumer(session.createTopic("Topic_One"));
+    MessageConsumer consumerTwo = 
session.createConsumer(session.createTopic("Topic_Two"));
+    ArrayList<TestEvent> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add(new TestEvent("Topic_One", "Message One " + i));
+    }
+    for (int i = 0; i < 100; i++) {
+      data.add(new TestEvent("Topic_Two", "Message Two " + i));
+    }
+    pipeline
+        .apply(Create.of(data))
+        .apply(
+            JmsIO.<TestEvent>write()
+                .withConnectionFactory(connectionFactory)
+                .withUsername(USERNAME)
+                .withPassword(PASSWORD)
+                .withTopicNameMapper(e -> e.getTopicName())
+                .withValueMapper(
+                    (e, s) -> {
+                      try {
+                        TextMessage msg = s.createTextMessage();
+                        msg.setText(e.getValue());
+                        return msg;
+                      } catch (JMSException ex) {
+                        throw new JmsIOException("Error writing TextMessage", 
ex);
+                      }
+                    }));
+
+    pipeline.run();
+
+    int count = 0;
+    while (consumerOne.receive(1000) != null) {
+      count++;
+    }
+    assertEquals(100, count);

Review Comment:
   ```suggestion
       assertEquals(50, count);
   ```



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
     assertEquals(100, count);
   }
 
+  @Test
+  public void testWriteMessageWithError() throws Exception {
+    ArrayList<String> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add("Message " + i);
+    }
+    String messageOne = "Message 1";
+    String messageTwo = "Message 2";
+
+    WriteJmsResult<String> output =
+        pipeline
+            .apply(Create.of(data))
+            .apply(
+                JmsIO.<String>write()
+                    .withConnectionFactory(connectionFactory)
+                    .withValueMapper(new TextMessageMapperWithError())
+                    .withQueue(QUEUE)
+                    .withUsername(USERNAME)
+                    .withPassword(PASSWORD));
+
+    PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne, 
messageTwo);

Review Comment:
   ```suggestion
       PAssert.that(output.getFailedMessages()).containsInAnyOrder("Message 1", 
"Message 2");
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+public class JmsIOException extends RuntimeException {
+  public JmsIOException(String message, Throwable cause) {
+    super(message, cause);
+  }

Review Comment:
   ```suggestion
     public JmsIOException(String message) {
       super(message);
     }
   
     public JmsIOException(String message, Throwable cause) {
       super(message, cause);
     }
   ```



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +709,128 @@ public Write withQueue(String queue) {
      * @param topic The JMS topic name.
      * @return The corresponding {@link JmsIO.Read}.
      */
-    public Write withTopic(String topic) {
+    public Write<EventT> withTopic(String topic) {
       checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
     /** Define the username to connect to the JMS broker (authenticated). */
-    public Write withUsername(String username) {
+    public Write<EventT> withUsername(String username) {
       checkArgument(username != null, "username can not be null");
       return builder().setUsername(username).build();
     }
 
     /** Define the password to connect to the JMS broker (authenticated). */
-    public Write withPassword(String password) {
+    public Write<EventT> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
+    /**
+     * Specify the JMS topic destination name where to send messages to 
dynamically. The {@link
+     * JmsIO.Write} acts as a publisher on the topic.
+     *
+     * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String) 
and
+     * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link 
SerializableFunction}
+     * that takes {@code EventT} object as a parameter, and returns the topic 
name depending of the content
+     * of the event object.
+     *
+     * <p>For instance:
+     * <pre>{@code
+     * SerializableFunction<SomeEventObject, String> topicNameMapper = (e -> 
return "the topic name");
+     *
+     * }</pre>

Review Comment:
   ```suggestion
        * <p>For example:
        * <pre>{@code
        * SerializableFunction<CompanyEvent, String> topicNameMapper =
        *   (event ->
        *    String.format(
        *    "company/%s/employee/%s",
        *    event.getCompanyName(),
        *    event.getEmployeeId()));
        * }</pre>
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762061)
    Time Spent: 2h  (was: 1h 50m)

> Dynamic Topics management
> -------------------------
>
>                 Key: BEAM-13608
>                 URL: https://issues.apache.org/jira/browse/BEAM-13608
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-jms
>            Reporter: Vincent BALLADA
>            Assignee: Vincent BALLADA
>            Priority: P2
>              Labels: assigned:
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> JmsIO write function is able to publish messages to topics with static names:
> company/employee/id/1234567.
> Some AMQP/JMS broker provides the ability to publish to dynamic topics like:
> company/employee/id/\{employeeId}
> If we want to handle that with Apache Beam JmsIO, we must create a branch per 
> employeeId, which is not suitable for a company with thousand of employee, or 
> other similat use cases.
> The JmsIO write function should provide the ability to handle dynamic topics.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to