Amraneze commented on code in PR #24973:
URL: https://github.com/apache/beam/pull/24973#discussion_r1103098370


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -932,36 +1036,158 @@ public void setup() throws Exception {
           } else if (spec.getTopic() != null) {
             this.destination = session.createTopic(spec.getTopic());
           }
-
-          this.producer = this.session.createProducer(null);
+          this.producer = this.session.createProducer(this.destination);
+          this.isProducerNeedsToBeCreated = false;
         }
       }
 
-      @ProcessElement
-      public void processElement(ProcessContext ctx) {
+      public void publishMessage(T input) throws JMSException, JmsIOException {
         Destination destinationToSendTo = destination;
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), 
session);
+          Message message = spec.getValueMapper().apply(input, session);
           if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                
session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+            destinationToSendTo = 
session.createTopic(spec.getTopicNameMapper().apply(input));
           }
           producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
+        } catch (JMSException | JmsIOException | NullPointerException 
exception) {
+          // Handle NPE in case of getValueMapper or getTopicNameMapper 
returns NPE
+          if (exception instanceof NullPointerException) {
+            throw new JmsIOException("An error occurred", exception);
+          }
+          throw exception;
+        }
+      }
+
+      public void close() throws JMSException {
+        isProducerNeedsToBeCreated = true;
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
         }
+        if (connection != null) {
+          try {
+            // If the connection failed, stopping the connection will throw a 
JMSException
+            connection.stop();
+          } catch (JMSException exception) {
+            LOG.warn("The connection couldn't be closed", exception);
+          }
+          connection.close();
+          connection = null;
+        }
+      }
+    }
+
+    private static class JmsIOProducerFn<T> extends DoFn<T, T> {
+
+      private final @Initialized JmsConnection<T> jmsConnection;
+      private final TupleTag<T> failedMessagesTag;
+
+      JmsIOProducerFn(JmsIO.Write<T> spec, TupleTag<T> failedMessagesTag) {
+        this.failedMessagesTag = failedMessagesTag;
+        this.jmsConnection = new JmsConnection<>(spec);
+      }
+
+      @StartBundle
+      public void startBundle() throws JMSException {
+        this.jmsConnection.start();
+      }
+
+      @ProcessElement
+      public void processElement(@Element T input, ProcessContext context) {
+        try {
+          this.jmsConnection.publishMessage(input);
+        } catch (JMSException | JmsIOException exception) {
+          context.output(this.failedMessagesTag, input);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws JMSException {
+        this.jmsConnection.close();
+      }
+
+      @Teardown
+      public void tearDown() throws JMSException {
+        this.jmsConnection.close();
+      }
+    }
+
+    static class JmsIOProduceRetryFn<T> extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private final JmsIO.Write<T> spec;
+      private final TupleTag<T> failedMessagesTags;
+      private final @Initialized JmsConnection<T> jmsConnection;
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, 
PUBLICATION_RETRIES_METRIC_NAME);
+
+      JmsIOProduceRetryFn(JmsIO.Write<T> spec, TupleTag<T> failedMessagesTags) 
{
+        this.spec = spec;
+        this.failedMessagesTags = failedMessagesTags;
+        this.jmsConnection = new JmsConnection<>(spec);
+      }
+
+      @Setup
+      public void setup() {
+        RetryConfiguration retryConfiguration = 
checkStateNotNull(spec.getRetryConfiguration());
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                
.withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                
.withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
+
+      @StartBundle
+      public void startBundle() throws JMSException {
+        this.jmsConnection.start();
+      }
+
+      @ProcessElement
+      public void processElement(@Element T input, ProcessContext context) {
+        try {
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);
+          context.output(this.failedMessagesTags, input);
+          if (exception instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+
+      private void publishMessage(T input, ProcessContext context)
+          throws IOException, InterruptedException {
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = checkStateNotNull(retryBackOff).backoff();
+        while (true) {
+          try {
+            this.jmsConnection.publishMessage(input);
+            break;
+          } catch (JMSException | JmsIOException exception) {
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              LOG.error("Error sending message", exception);
+              context.output(this.failedMessagesTags, input);

Review Comment:
   Yeah, I was thinking about it, I will remove it we know that the message is 
sent to the output, I wanted just to log that the message was failed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to