Amraneze commented on PR #24973: URL: https://github.com/apache/beam/pull/24973#issuecomment-1386663454
> The issue is that JmsIO doesn't retry at all when publishing a message, any message that is failed to publish is pushed to ouput as you can see in [JmsIO code](https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L940): ````java @ProcessElement public void processElement(ProcessContext ctx) { Destination destinationToSendTo = destination; try { Message message = spec.getValueMapper().apply(ctx.element(), session); if (spec.getTopicNameMapper() != null) { destinationToSendTo = session.createTopic(spec.getTopicNameMapper().apply(ctx.element())); } producer.send(destinationToSendTo, message); } catch (Exception ex) { LOG.error("Error sending message on topic {}", destinationToSendTo); ctx.output(failedMessageTag, ctx.element()); } } ```` If there are any exceptions, they will be catched and the message is pushed to failed message tag's output. Which means we will need to get the failed messages and retry by ourselves N times like this: ````java public PDone expand(PCollection<Message> messages) { // Retry 3 times when the session is closed messages.apply(getJmsWriter(sinkOptions)) .getFailedMessages() .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions)) .getFailedMessages() .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions)) .getFailedMessages() .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions)); return PDone.in(messages.getPipeline()); } private static JmsIO.Write<Message> getJmsWriter(SinkOptions sinkOptions) { return JmsIO.<Message>write() .withConnectionFactory(SinkOptions.getConnectionFactory()) .withValueMapper(getValueMapper()) .withTopicNameMapper(getTopicNameMapper()); } ```` When the session is closed and the first PTransform that is publishing the message is displaying error logs `Error sending message on topic`, it will not reconnect and just send all messages to the second step as you can see in the screenshot <img width="1280" alt="image" src="https://user-images.githubusercontent.com/28459763/213120382-941c1769-805d-4685-aff0-94a508b94f3d.png"> >Note: The graph of first PTransform is the same as the second PTransform after the session is closed, which means the first step doesn't publish at all and it just send all messages to the second step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
