[
https://issues.apache.org/jira/browse/BEAM-13608?focusedWorklogId=762061&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762061
]
ASF GitHub Bot logged work on BEAM-13608:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/Apr/22 22:56
Start Date: 25/Apr/22 22:56
Worklog Time Spent: 10m
Work Description: lukecwik commented on code in PR #17163:
URL: https://github.com/apache/beam/pull/17163#discussion_r858075089
##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of
javax.jms.Message
Review Comment:
```suggestion
* JmsIO gains the ability to map any kind of input to any subclass of
`javax.jms.Message`
([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
```
##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of
javax.jms.Message
+ * To have the same behavior as before, a valueMapper with a
TextMessageMapper must be set at JmsIO.Write initialization like this:
+```java
+ JmsIO.<String>write()
+ .withConnectionFactory(jmsConnectionFactory)
+ .withCoder(SerializableCoder.of(String.class))
+ .withValueMapper(new TextMessageMapper());
+```
+* JMSIO introduces the ability to write to dynamic topics.
+ * A topicNameMapper must be set to extract the topic name from the input,
+ * A valueMapper must be set to extract the value from the input.
+
+1. For instance let's create a topicNameMapper. Employee Event is a Map.
+
+```java
+ String topic = "company/%s/employee/%s";
+ // Get a topic name formatted like this:
+ // company/%s/employee/%s
+ // Vehicle name is a map containing the vehicle brand and the VIN
+ SerializableFunction<CompanyEvent, String> topicNameMapper =
+ (e ->
+ String.format(
+ topic,
+ e.get(COMPANY_NAME.fieldName()).toString().toLowerCase(),
+ e.get(EMPLOYEE.fieldName()).toString().toLowerCase()));
+}
+```
+
+2. Let's create a valueMapper that will return a javax.jms.Message:
+
+```java
+ SerializableBiFunction<EmployeeEvent, Session, Message> valueMapper =
(e, s) -> {
+ try {
+ TextMessage msg = s.createTextMessage();
+ msg.setText(Mapper.MAPPER.toJson(e));
+ return msg;
+ } catch (JMSException ex) {
+ throw new JmsIOException("Error!!", ex);
+ }
+ };
+```
+
+3. We now have to inject those two functions into the JmsIO.Write class:
+
+```java
+ JmsIO.<EmployeeEvent>write()
+ .withConnectionFactory(jmsConnectionFactory)
+ .withTopicNameMapper(topicNameMapper)
+ .withCoder(SerializableCoder.of(EmployeeEvent.class))
+ .withValueMapper(valueMapper);
+
+```
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +709,128 @@ public Write withQueue(String queue) {
* @param topic The JMS topic name.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withTopic(String topic) {
+ public Write<EventT> withTopic(String topic) {
checkArgument(topic != null, "topic can not be null");
return builder().setTopic(topic).build();
}
/** Define the username to connect to the JMS broker (authenticated). */
- public Write withUsername(String username) {
+ public Write<EventT> withUsername(String username) {
checkArgument(username != null, "username can not be null");
return builder().setUsername(username).build();
}
/** Define the password to connect to the JMS broker (authenticated). */
- public Write withPassword(String password) {
+ public Write<EventT> withPassword(String password) {
checkArgument(password != null, "password can not be null");
return builder().setPassword(password).build();
}
+ /**
+ * Specify the JMS topic destination name where to send messages to
dynamically. The {@link
+ * JmsIO.Write} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)
and
+ * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link
SerializableFunction}
+ * that takes {@code EventT} object as a parameter, and returns the topic
name depending of the content
+ * of the event object.
+ *
+ * <p>For instance:
+ * <pre>{@code
+ * SerializableFunction<SomeEventObject, String> topicNameMapper = (e ->
return "the topic name");
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+ * }</pre>
+ *
+ * @param topicNameMapper The function returning the dynamic topic name.
+ * @return The corresponding {@link JmsIO.Write}.
+ */
+ public Write<EventT> withTopicNameMapper(SerializableFunction<EventT,
String> topicNameMapper) {
+ checkArgument(topicNameMapper != null, "topicNameMapper can not be
null");
+ return builder().setTopicNameMapper(topicNameMapper).build();
+ }
+
+ /**
+ * Map the {@code EventT} object to a {@link javax.jms.Message}.
+ *
+ * <p>For instance:
+ *
+ * <pre>{@code
+ * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper =
(e, s) -> {
+ *
+ * try {
+ * TextMessage msg = s.createTextMessage();
+ * msg.setText(Mapper.MAPPER.toJson(e));
+ * return msg;
+ * } catch (JMSException ex) {
+ * throw new JmsIOException("Error!!", ex);
+ * }
+ * };
+ *
+ * }</pre>
Review Comment:
```suggestion
* SerializableBiFunction<SomeEventObject, Session, Message> valueMapper
= (event, session) -> {
*
* try {
* TextMessage msg = session.createTextMessage();
* msg.setText(event.toJson());
* return msg;
* } catch (JMSException ex) {
* throw new JmsIOException("Error!", ex);
* }
* };
* }</pre>
```
##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of
javax.jms.Message
+ * To have the same behavior as before, a valueMapper with a
TextMessageMapper must be set at JmsIO.Write initialization like this:
+```java
+ JmsIO.<String>write()
+ .withConnectionFactory(jmsConnectionFactory)
+ .withCoder(SerializableCoder.of(String.class))
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
assertEquals(100, count);
}
+ @Test
+ public void testWriteMessageWithError() throws Exception {
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add("Message " + i);
+ }
+ String messageOne = "Message 1";
+ String messageTwo = "Message 2";
+
+ WriteJmsResult<String> output =
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<String>write()
+ .withConnectionFactory(connectionFactory)
+ .withValueMapper(new TextMessageMapperWithError())
+ .withQueue(QUEUE)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD));
+
+ PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne,
messageTwo);
+
+ pipeline.run();
+
+ Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(QUEUE));
+ int count = 0;
+ while (consumer.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(98, count);
+ assertTrue(output.getPipeline().equals(pipeline));
+ }
+
+ @Test
+ public void testWriteDynamicMessage() throws Exception {
+ Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerOne =
session.createConsumer(session.createTopic("Topic_One"));
+ MessageConsumer consumerTwo =
session.createConsumer(session.createTopic("Topic_Two"));
+ ArrayList<TestEvent> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
Review Comment:
```suggestion
for (int i = 0; i < 50; i++) {
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
assertEquals(100, count);
}
+ @Test
+ public void testWriteMessageWithError() throws Exception {
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add("Message " + i);
+ }
+ String messageOne = "Message 1";
+ String messageTwo = "Message 2";
+
+ WriteJmsResult<String> output =
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<String>write()
+ .withConnectionFactory(connectionFactory)
+ .withValueMapper(new TextMessageMapperWithError())
+ .withQueue(QUEUE)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD));
+
+ PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne,
messageTwo);
+
+ pipeline.run();
+
+ Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(QUEUE));
+ int count = 0;
+ while (consumer.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(98, count);
+ assertTrue(output.getPipeline().equals(pipeline));
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,4 +643,39 @@ private <T, MethodArgT, MethodResultT> T proxyMethod(
return result;
});
}
+
+ private static class TestEvent implements Serializable {
+ private final String topicName;
+ private final String value;
+
+ private TestEvent(String topicName, String value) {
+ this.topicName = topicName;
+ this.value = value;
+ }
+
+ private String getTopicName() {
+ return this.topicName;
+ }
+
+ private String getValue() {
+ return this.value;
+ }
+ }
+
+ private static class TextMessageMapperWithError
+ implements SerializableBiFunction<String, Session, Message> {
+ @Override
+ public Message apply(String value, Session session) {
+ try {
+ if (value.equals("Message " + 1) || value.equals("Message " + 2)) {
Review Comment:
```suggestion
if (value.equals("Message 1") || value.equals("Message 2")) {
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -656,7 +675,7 @@ public Write withConnectionFactory(ConnectionFactory
connectionFactory) {
* acts as a producer on the queue.
*
* <p>This method is exclusive with {@link JmsIO.Write#withTopic(String)}.
The user has to
- * specify a destination: queue or topic.
+ * specify a destination: queue or topic or topicNameMapper.
Review Comment:
```suggestion
* specify a destination: queue, topic, or topicNameMapper.
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -678,7 +697,7 @@ public Write withQueue(String queue) {
* as a publisher on the topic.
*
* <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)}.
The user has to
- * specify a destination: queue or topic.
+ * specify a destination: queue or topic or topicNameMapper.
Review Comment:
```suggestion
* specify a destination: queue, topic, or topicNameMapper.
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +709,128 @@ public Write withQueue(String queue) {
* @param topic The JMS topic name.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withTopic(String topic) {
+ public Write<EventT> withTopic(String topic) {
checkArgument(topic != null, "topic can not be null");
return builder().setTopic(topic).build();
}
/** Define the username to connect to the JMS broker (authenticated). */
- public Write withUsername(String username) {
+ public Write<EventT> withUsername(String username) {
checkArgument(username != null, "username can not be null");
return builder().setUsername(username).build();
}
/** Define the password to connect to the JMS broker (authenticated). */
- public Write withPassword(String password) {
+ public Write<EventT> withPassword(String password) {
checkArgument(password != null, "password can not be null");
return builder().setPassword(password).build();
}
+ /**
+ * Specify the JMS topic destination name where to send messages to
dynamically. The {@link
+ * JmsIO.Write} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)
and
+ * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link
SerializableFunction}
+ * that takes {@code EventT} object as a parameter, and returns the topic
name depending of the content
+ * of the event object.
+ *
+ * <p>For instance:
+ * <pre>{@code
+ * SerializableFunction<SomeEventObject, String> topicNameMapper = (e ->
return "the topic name");
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withTopicNameMapper(topicNameNapper)
+ * }</pre>
+ *
+ * @param topicNameMapper The function returning the dynamic topic name.
+ * @return The corresponding {@link JmsIO.Write}.
+ */
+ public Write<EventT> withTopicNameMapper(SerializableFunction<EventT,
String> topicNameMapper) {
+ checkArgument(topicNameMapper != null, "topicNameMapper can not be
null");
+ return builder().setTopicNameMapper(topicNameMapper).build();
+ }
+
+ /**
+ * Map the {@code EventT} object to a {@link javax.jms.Message}.
+ *
+ * <p>For instance:
+ *
+ * <pre>{@code
+ * SerializableBiFunction<SomeEventObject, Session, Message> valueMapper =
(e, s) -> {
+ *
+ * try {
+ * TextMessage msg = s.createTextMessage();
+ * msg.setText(Mapper.MAPPER.toJson(e));
+ * return msg;
+ * } catch (JMSException ex) {
+ * throw new JmsIOException("Error!!", ex);
+ * }
+ * };
+ *
+ * }</pre>
+ *
+ * <pre>{@code
+ * .apply(JmsIO.write().withValueMapper(valueNapper)
+ *
Review Comment:
```suggestion
```
##########
CHANGES.md:
##########
@@ -49,6 +49,62 @@
* ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-->
+
+## I/Os
+
+* JMSIO gains the ability to map any kind of input to any subclass of
javax.jms.Message
+ * To have the same behavior as before, a valueMapper with a
TextMessageMapper must be set at JmsIO.Write initialization like this:
+```java
+ JmsIO.<String>write()
+ .withConnectionFactory(jmsConnectionFactory)
+ .withCoder(SerializableCoder.of(String.class))
+ .withValueMapper(new TextMessageMapper());
+```
+* JMSIO introduces the ability to write to dynamic topics.
+ * A topicNameMapper must be set to extract the topic name from the input,
+ * A valueMapper must be set to extract the value from the input.
Review Comment:
```suggestion
* JmsIO introduces the ability to write to dynamic topics (Java)
([BEAM-16308](https://issues.apache.org/jira/browse/BEAM-16308)).
* A `topicNameMapper` must be set to extract the topic name from the input
value.
* A `valueMapper` must be set to convert the input value to JMS message.
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
assertEquals(100, count);
}
+ @Test
+ public void testWriteMessageWithError() throws Exception {
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add("Message " + i);
+ }
+ String messageOne = "Message 1";
+ String messageTwo = "Message 2";
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
assertEquals(100, count);
}
+ @Test
+ public void testWriteMessageWithError() throws Exception {
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add("Message " + i);
+ }
+ String messageOne = "Message 1";
+ String messageTwo = "Message 2";
+
+ WriteJmsResult<String> output =
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<String>write()
+ .withConnectionFactory(connectionFactory)
+ .withValueMapper(new TextMessageMapperWithError())
+ .withQueue(QUEUE)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD));
+
+ PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne,
messageTwo);
+
+ pipeline.run();
+
+ Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(QUEUE));
+ int count = 0;
+ while (consumer.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(98, count);
+ assertTrue(output.getPipeline().equals(pipeline));
+ }
+
+ @Test
+ public void testWriteDynamicMessage() throws Exception {
+ Connection connection = connectionFactory.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumerOne =
session.createConsumer(session.createTopic("Topic_One"));
+ MessageConsumer consumerTwo =
session.createConsumer(session.createTopic("Topic_Two"));
+ ArrayList<TestEvent> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add(new TestEvent("Topic_One", "Message One " + i));
+ }
+ for (int i = 0; i < 100; i++) {
+ data.add(new TestEvent("Topic_Two", "Message Two " + i));
+ }
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<TestEvent>write()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withTopicNameMapper(e -> e.getTopicName())
+ .withValueMapper(
+ (e, s) -> {
+ try {
+ TextMessage msg = s.createTextMessage();
+ msg.setText(e.getValue());
+ return msg;
+ } catch (JMSException ex) {
+ throw new JmsIOException("Error writing TextMessage",
ex);
+ }
+ }));
+
+ pipeline.run();
+
+ int count = 0;
+ while (consumerOne.receive(1000) != null) {
+ count++;
+ }
+ assertEquals(100, count);
Review Comment:
```suggestion
assertEquals(50, count);
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -255,6 +259,90 @@ public void testWriteMessage() throws Exception {
assertEquals(100, count);
}
+ @Test
+ public void testWriteMessageWithError() throws Exception {
+ ArrayList<String> data = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ data.add("Message " + i);
+ }
+ String messageOne = "Message 1";
+ String messageTwo = "Message 2";
+
+ WriteJmsResult<String> output =
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ JmsIO.<String>write()
+ .withConnectionFactory(connectionFactory)
+ .withValueMapper(new TextMessageMapperWithError())
+ .withQueue(QUEUE)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD));
+
+ PAssert.that(output.getFailedMessages()).containsInAnyOrder(messageOne,
messageTwo);
Review Comment:
```suggestion
PAssert.that(output.getFailedMessages()).containsInAnyOrder("Message 1",
"Message 2");
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIOException.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+public class JmsIOException extends RuntimeException {
+ public JmsIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
Review Comment:
```suggestion
public JmsIOException(String message) {
super(message);
}
public JmsIOException(String message, Throwable cause) {
super(message, cause);
}
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -690,47 +709,128 @@ public Write withQueue(String queue) {
* @param topic The JMS topic name.
* @return The corresponding {@link JmsIO.Read}.
*/
- public Write withTopic(String topic) {
+ public Write<EventT> withTopic(String topic) {
checkArgument(topic != null, "topic can not be null");
return builder().setTopic(topic).build();
}
/** Define the username to connect to the JMS broker (authenticated). */
- public Write withUsername(String username) {
+ public Write<EventT> withUsername(String username) {
checkArgument(username != null, "username can not be null");
return builder().setUsername(username).build();
}
/** Define the password to connect to the JMS broker (authenticated). */
- public Write withPassword(String password) {
+ public Write<EventT> withPassword(String password) {
checkArgument(password != null, "password can not be null");
return builder().setPassword(password).build();
}
+ /**
+ * Specify the JMS topic destination name where to send messages to
dynamically. The {@link
+ * JmsIO.Write} acts as a publisher on the topic.
+ *
+ * <p>This method is exclusive with {@link JmsIO.Write#withQueue(String)
and
+ * {@link JmsIO.Write#withTopic(String)}. The user has to specify a {@link
SerializableFunction}
+ * that takes {@code EventT} object as a parameter, and returns the topic
name depending of the content
+ * of the event object.
+ *
+ * <p>For instance:
+ * <pre>{@code
+ * SerializableFunction<SomeEventObject, String> topicNameMapper = (e ->
return "the topic name");
+ *
+ * }</pre>
Review Comment:
```suggestion
* <p>For example:
* <pre>{@code
* SerializableFunction<CompanyEvent, String> topicNameMapper =
* (event ->
* String.format(
* "company/%s/employee/%s",
* event.getCompanyName(),
* event.getEmployeeId()));
* }</pre>
```
Issue Time Tracking
-------------------
Worklog Id: (was: 762061)
Time Spent: 2h (was: 1h 50m)
> Dynamic Topics management
> -------------------------
>
> Key: BEAM-13608
> URL: https://issues.apache.org/jira/browse/BEAM-13608
> Project: Beam
> Issue Type: Improvement
> Components: io-java-jms
> Reporter: Vincent BALLADA
> Assignee: Vincent BALLADA
> Priority: P2
> Labels: assigned:
> Time Spent: 2h
> Remaining Estimate: 0h
>
> JmsIO write function is able to publish messages to topics with static names:
> company/employee/id/1234567.
> Some AMQP/JMS broker provides the ability to publish to dynamic topics like:
> company/employee/id/\{employeeId}
> If we want to handle that with Apache Beam JmsIO, we must create a branch per
> employeeId, which is not suitable for a company with thousand of employee, or
> other similat use cases.
> The JmsIO write function should provide the ability to handle dynamic topics.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)