Abacn commented on code in PR #24973:
URL: https://github.com/apache/beam/pull/24973#discussion_r1103017664


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -932,36 +1036,158 @@ public void setup() throws Exception {
           } else if (spec.getTopic() != null) {
             this.destination = session.createTopic(spec.getTopic());
           }
-
-          this.producer = this.session.createProducer(null);
+          this.producer = this.session.createProducer(this.destination);
+          this.isProducerNeedsToBeCreated = false;
         }
       }
 
-      @ProcessElement
-      public void processElement(ProcessContext ctx) {
+      public void publishMessage(T input) throws JMSException, JmsIOException {
         Destination destinationToSendTo = destination;
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), 
session);
+          Message message = spec.getValueMapper().apply(input, session);
           if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                
session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+            destinationToSendTo = 
session.createTopic(spec.getTopicNameMapper().apply(input));
           }
           producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
+        } catch (JMSException | JmsIOException | NullPointerException 
exception) {
+          // Handle NPE in case of getValueMapper or getTopicNameMapper 
returns NPE
+          if (exception instanceof NullPointerException) {
+            throw new JmsIOException("An error occurred", exception);
+          }
+          throw exception;
+        }
+      }
+
+      public void close() throws JMSException {
+        isProducerNeedsToBeCreated = true;
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
         }
+        if (connection != null) {
+          try {
+            // If the connection failed, stopping the connection will throw a 
JMSException
+            connection.stop();
+          } catch (JMSException exception) {
+            LOG.warn("The connection couldn't be closed", exception);
+          }
+          connection.close();
+          connection = null;
+        }
+      }
+    }
+
+    private static class JmsIOProducerFn<T> extends DoFn<T, T> {

Review Comment:
   If I understand correctly, if should just work using JmsIOProduceRetryFn 
alone, why need another `JmsIOProducerFn` before it?
   
   I personally think two step retry-write-on-failed is not a good pattern 
because it is unnecessarily complicated and more important it is hard to test 
(though I see it tested in unit test). It also requires to maintain two classes 
doing the same thing and keep them consistent for any future addition which is 
errorprone.
   
   



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -897,32 +944,89 @@ private boolean isExclusiveTopicQueue() {
               == 1;
       return exclusiveTopicQueue;
     }
+  }
+
+  static class Writer<T> extends PTransform<PCollection<T>, WriteJmsResult<T>> 
{
+
+    public static final String CONNECTION_ERRORS_METRIC_NAME = 
"connectionErrors";
+    public static final String PUBLICATION_RETRIES_METRIC_NAME = 
"publicationRetries";
+    public static final String JMS_IO_PRODUCER_METRIC_NAME = 
Writer.class.getCanonicalName();
+
+    private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
+    private static final String PUBLISH_TO_JMS_STEP_NAME = "Publish to JMS";
+    private static final String REPUBLISH_TO_JMS_STEP_NAME = "Republish to 
JMS";
+
+    private final JmsIO.Write<T> spec;
+    private final TupleTag<T> messagesTag;
+    private final TupleTag<T> failedMessagesTag;
+    private final TupleTag<T> failedRepublishedMessagesTag;
+
+    Writer(JmsIO.Write<T> spec) {
+      this.spec = spec;
+      this.messagesTag = new TupleTag<>();
+      this.failedMessagesTag = new TupleTag<>();
+      this.failedRepublishedMessagesTag = new TupleTag<>();
+    }
+
+    @Override
+    public WriteJmsResult<T> expand(PCollection<T> input) {
+      PCollectionTuple failedPublishedMessagesTuple =
+          input.apply(
+              PUBLISH_TO_JMS_STEP_NAME,
+              ParDo.of(new JmsIOProducerFn<>(spec, failedMessagesTag))
+                  .withOutputTags(messagesTag, 
TupleTagList.of(failedMessagesTag)));
+      PCollection<T> failedPublishedMessages =
+          
failedPublishedMessagesTuple.get(failedMessagesTag).setCoder(input.getCoder());
+      failedPublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder());
+
+      // Republishing failed messages
+      PCollectionTuple failedRepublishedMessagesTuple =
+          failedPublishedMessages.apply(
+              REPUBLISH_TO_JMS_STEP_NAME,
+              ParDo.of(new JmsIOProduceRetryFn<>(spec, 
failedRepublishedMessagesTag))
+                  .withOutputTags(messagesTag, 
TupleTagList.of(failedRepublishedMessagesTag)));
+      PCollection<T> failedRepublishedMessages =
+          failedRepublishedMessagesTuple
+              .get(failedRepublishedMessagesTag)
+              .setCoder(input.getCoder());
+      
failedRepublishedMessagesTuple.get(messagesTag).setCoder(input.getCoder());
+
+      return WriteJmsResult.in(
+          input.getPipeline(), failedRepublishedMessagesTag, 
failedRepublishedMessages);
+    }
 
-    private static class WriterFn<EventT> extends DoFn<EventT, EventT> {
+    private static class JmsConnection<T> implements Serializable {

Review Comment:
   thanks, a helper class really reduced the complexity of DoFns.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -932,36 +1036,158 @@ public void setup() throws Exception {
           } else if (spec.getTopic() != null) {
             this.destination = session.createTopic(spec.getTopic());
           }
-
-          this.producer = this.session.createProducer(null);
+          this.producer = this.session.createProducer(this.destination);
+          this.isProducerNeedsToBeCreated = false;
         }
       }
 
-      @ProcessElement
-      public void processElement(ProcessContext ctx) {
+      public void publishMessage(T input) throws JMSException, JmsIOException {
         Destination destinationToSendTo = destination;
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), 
session);
+          Message message = spec.getValueMapper().apply(input, session);
           if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                
session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+            destinationToSendTo = 
session.createTopic(spec.getTopicNameMapper().apply(input));
           }
           producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
+        } catch (JMSException | JmsIOException | NullPointerException 
exception) {
+          // Handle NPE in case of getValueMapper or getTopicNameMapper 
returns NPE
+          if (exception instanceof NullPointerException) {
+            throw new JmsIOException("An error occurred", exception);
+          }
+          throw exception;
+        }
+      }
+
+      public void close() throws JMSException {
+        isProducerNeedsToBeCreated = true;
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
         }
+        if (connection != null) {
+          try {
+            // If the connection failed, stopping the connection will throw a 
JMSException
+            connection.stop();
+          } catch (JMSException exception) {
+            LOG.warn("The connection couldn't be closed", exception);
+          }
+          connection.close();
+          connection = null;
+        }
+      }
+    }
+
+    private static class JmsIOProducerFn<T> extends DoFn<T, T> {
+
+      private final @Initialized JmsConnection<T> jmsConnection;
+      private final TupleTag<T> failedMessagesTag;
+
+      JmsIOProducerFn(JmsIO.Write<T> spec, TupleTag<T> failedMessagesTag) {
+        this.failedMessagesTag = failedMessagesTag;
+        this.jmsConnection = new JmsConnection<>(spec);
+      }
+
+      @StartBundle
+      public void startBundle() throws JMSException {
+        this.jmsConnection.start();
+      }
+
+      @ProcessElement
+      public void processElement(@Element T input, ProcessContext context) {
+        try {
+          this.jmsConnection.publishMessage(input);
+        } catch (JMSException | JmsIOException exception) {
+          context.output(this.failedMessagesTag, input);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws JMSException {
+        this.jmsConnection.close();
+      }
+
+      @Teardown
+      public void tearDown() throws JMSException {
+        this.jmsConnection.close();
+      }
+    }
+
+    static class JmsIOProduceRetryFn<T> extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private final JmsIO.Write<T> spec;
+      private final TupleTag<T> failedMessagesTags;
+      private final @Initialized JmsConnection<T> jmsConnection;
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, 
PUBLICATION_RETRIES_METRIC_NAME);
+
+      JmsIOProduceRetryFn(JmsIO.Write<T> spec, TupleTag<T> failedMessagesTags) 
{
+        this.spec = spec;
+        this.failedMessagesTags = failedMessagesTags;
+        this.jmsConnection = new JmsConnection<>(spec);
+      }
+
+      @Setup
+      public void setup() {
+        RetryConfiguration retryConfiguration = 
checkStateNotNull(spec.getRetryConfiguration());
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                
.withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                
.withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
+
+      @StartBundle
+      public void startBundle() throws JMSException {
+        this.jmsConnection.start();
+      }
+
+      @ProcessElement
+      public void processElement(@Element T input, ProcessContext context) {
+        try {
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);
+          context.output(this.failedMessagesTags, input);
+          if (exception instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+
+      private void publishMessage(T input, ProcessContext context)
+          throws IOException, InterruptedException {
+        Sleeper sleeper = Sleeper.DEFAULT;
+        BackOff backoff = checkStateNotNull(retryBackOff).backoff();
+        while (true) {
+          try {
+            this.jmsConnection.publishMessage(input);
+            break;
+          } catch (JMSException | JmsIOException exception) {
+            if (!BackOffUtils.next(sleeper, backoff)) {
+              LOG.error("Error sending message", exception);
+              context.output(this.failedMessagesTags, input);

Review Comment:
   already have LOG.error and output here.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -932,36 +1036,158 @@ public void setup() throws Exception {
           } else if (spec.getTopic() != null) {
             this.destination = session.createTopic(spec.getTopic());
           }
-
-          this.producer = this.session.createProducer(null);
+          this.producer = this.session.createProducer(this.destination);
+          this.isProducerNeedsToBeCreated = false;
         }
       }
 
-      @ProcessElement
-      public void processElement(ProcessContext ctx) {
+      public void publishMessage(T input) throws JMSException, JmsIOException {
         Destination destinationToSendTo = destination;
         try {
-          Message message = spec.getValueMapper().apply(ctx.element(), 
session);
+          Message message = spec.getValueMapper().apply(input, session);
           if (spec.getTopicNameMapper() != null) {
-            destinationToSendTo =
-                
session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
+            destinationToSendTo = 
session.createTopic(spec.getTopicNameMapper().apply(input));
           }
           producer.send(destinationToSendTo, message);
-        } catch (Exception ex) {
-          LOG.error("Error sending message on topic {}", destinationToSendTo);
-          ctx.output(failedMessageTag, ctx.element());
+        } catch (JMSException | JmsIOException | NullPointerException 
exception) {
+          // Handle NPE in case of getValueMapper or getTopicNameMapper 
returns NPE
+          if (exception instanceof NullPointerException) {
+            throw new JmsIOException("An error occurred", exception);
+          }
+          throw exception;
+        }
+      }
+
+      public void close() throws JMSException {
+        isProducerNeedsToBeCreated = true;
+        if (producer != null) {
+          producer.close();
+          producer = null;
+        }
+        if (session != null) {
+          session.close();
+          session = null;
         }
+        if (connection != null) {
+          try {
+            // If the connection failed, stopping the connection will throw a 
JMSException
+            connection.stop();
+          } catch (JMSException exception) {
+            LOG.warn("The connection couldn't be closed", exception);
+          }
+          connection.close();
+          connection = null;
+        }
+      }
+    }
+
+    private static class JmsIOProducerFn<T> extends DoFn<T, T> {
+
+      private final @Initialized JmsConnection<T> jmsConnection;
+      private final TupleTag<T> failedMessagesTag;
+
+      JmsIOProducerFn(JmsIO.Write<T> spec, TupleTag<T> failedMessagesTag) {
+        this.failedMessagesTag = failedMessagesTag;
+        this.jmsConnection = new JmsConnection<>(spec);
+      }
+
+      @StartBundle
+      public void startBundle() throws JMSException {
+        this.jmsConnection.start();
+      }
+
+      @ProcessElement
+      public void processElement(@Element T input, ProcessContext context) {
+        try {
+          this.jmsConnection.publishMessage(input);
+        } catch (JMSException | JmsIOException exception) {
+          context.output(this.failedMessagesTag, input);
+        }
+      }
+
+      @FinishBundle
+      public void finishBundle() throws JMSException {
+        this.jmsConnection.close();
+      }
+
+      @Teardown
+      public void tearDown() throws JMSException {
+        this.jmsConnection.close();
+      }
+    }
+
+    static class JmsIOProduceRetryFn<T> extends DoFn<T, T> {
+
+      private transient @Initialized FluentBackoff retryBackOff;
+
+      private final JmsIO.Write<T> spec;
+      private final TupleTag<T> failedMessagesTags;
+      private final @Initialized JmsConnection<T> jmsConnection;
+      private final Counter publicationRetries =
+          Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, 
PUBLICATION_RETRIES_METRIC_NAME);
+
+      JmsIOProduceRetryFn(JmsIO.Write<T> spec, TupleTag<T> failedMessagesTags) 
{
+        this.spec = spec;
+        this.failedMessagesTags = failedMessagesTags;
+        this.jmsConnection = new JmsConnection<>(spec);
+      }
+
+      @Setup
+      public void setup() {
+        RetryConfiguration retryConfiguration = 
checkStateNotNull(spec.getRetryConfiguration());
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                
.withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
+                
.withMaxCumulativeBackoff(checkStateNotNull(retryConfiguration.getMaxDuration()))
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
+
+      @StartBundle
+      public void startBundle() throws JMSException {
+        this.jmsConnection.start();
+      }
+
+      @ProcessElement
+      public void processElement(@Element T input, ProcessContext context) {
+        try {
+          publishMessage(input, context);
+        } catch (IOException | InterruptedException exception) {
+          LOG.error("Error while publishing the message", exception);

Review Comment:
   Outputed twice and double logging for failed publishMessage (see below)
   
   publishMessage may only publish messege (no need ProcessContext parameter 
their) and throw exception when retry count used up. Then deal with the failure 
in processElement here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to