Abacn commented on code in PR #24973:
URL: https://github.com/apache/beam/pull/24973#discussion_r1103237197


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -932,36 +1036,158 @@ public void setup() throws Exception {
           } else if (spec.getTopic() != null) {
             this.destination = session.createTopic(spec.getTopic());
           }
-
-          this.producer = this.session.createProducer(null);
+          this.producer = this.session.createProducer(this.destination);
+          this.isProducerNeedsToBeCreated = false;
         }
       }
 
-      @ProcessElement
-      public void processElement(ProcessContext ctx) {
+      public void publishMessage(T input) throws JMSException, JmsIOException {
         Destination destinationToSendTo = destination;
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), 
session);
+          Message message = spec.getValueMapper().apply(input, session);
           if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                
session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+            destinationToSendTo = 
session.createTopic(spec.getTopicNameMapper().apply(input));
           }
           producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
+        } catch (JMSException | JmsIOException | NullPointerException 
exception) {
+          // Handle NPE in case of getValueMapper or getTopicNameMapper 
returns NPE
+          if (exception instanceof NullPointerException) {
+            throw new JmsIOException("An error occurred", exception);
+          }
+          throw exception;
+        }
+      }
+
+      public void close() throws JMSException {
+        isProducerNeedsToBeCreated = true;
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
         }
+        if (connection != null) {
+          try {
+            // If the connection failed, stopping the connection will throw a 
JMSException
+            connection.stop();
+          } catch (JMSException exception) {
+            LOG.warn("The connection couldn't be closed", exception);
+          }
+          connection.close();
+          connection = null;
+        }
+      }
+    }
+
+    private static class JmsIOProducerFn<T> extends DoFn<T, T> {

Review Comment:
   For now I would simply use `JmsIOProduceRetryFn`. For succeeded send, the 
while (true) loop just breaks before checking backOff, it should has same 
performance as `JmsIOProducerFn`.
   
   There are of course way of improvement. Currently I understand the `.send()` 
call is synchroneous (otherwise backoff won't work). Switch to cached, 
asynchronous API call would be more performant. An example in Beam's code base 
is KafkaWriter: 
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to