mxm commented on a change in pull request #12184:
URL: https://github.com/apache/beam/pull/12184#discussion_r450789800
##########
File path:
sdks/java/io/rabbitmq/src/test/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIOTest.java
##########
@@ -196,59 +200,74 @@ private void doExchangeTest(ExchangeTestPlan testPlan,
boolean simulateIncompati
exchangeType = "fanout";
}
}
+ final String finalExchangeType = exchangeType;
+ final CountDownLatch waitForExchangeToBeDeclared = new CountDownLatch(1);
+ final BlockingQueue<byte[]> recordsToPublish = new LinkedBlockingQueue<>();
+
recordsToPublish.addAll(RabbitMqTestUtils.generateRecords(testPlan.getNumRecordsToPublish()));
+ Thread publisher =
+ new Thread(
+ () -> {
+ Connection connection = null;
+ Channel channel = null;
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setAutomaticRecoveryEnabled(false);
+ connectionFactory.setUri(uri);
+ connection = connectionFactory.newConnection();
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchange, finalExchangeType);
+ // We are relying on the pipeline to declare the queue and
messages that are
+ // published without a queue being declared are "unroutable".
Since there is a race
+ // between when the pipeline declares and when we can start
publishing, we add a
+ // handler to republish messages that are returned to us.
+ channel.addReturnListener(
+ (replyCode, replyText, exchange1, routingKey, properties,
body) -> {
+ try {
+ recordsToPublish.put(body);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ waitForExchangeToBeDeclared.countDown();
+ while (true) {
+ byte[] record = recordsToPublish.take();
+ if (record == terminalRecord) {
+ return;
+ }
+ channel.basicPublish(
+ exchange,
+ testPlan.publishRoutingKeyGen().get(),
+ true, // ensure that messages are returned to sender
+ testPlan.getPublishProperties(),
+ record);
+ }
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setAutomaticRecoveryEnabled(false);
- connectionFactory.setUri(uri);
- Connection connection = null;
- Channel channel = null;
-
- try {
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- channel.exchangeDeclare(exchange, exchangeType);
- final Channel finalChannel = channel;
- Thread publisher =
- new Thread(
- () -> {
- try {
- Thread.sleep(5000);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (channel != null) {
+ // channel may have already been closed automatically due to
protocol failure
+ try {
+ channel.close();
+ } catch (Exception e) {
+ /* ignored */
+ }
}
- for (int i = 0; i < testPlan.getNumRecordsToPublish(); i++) {
+ if (connection != null) {
+ // connection may have already been closed automatically due
to protocol failure
try {
- finalChannel.basicPublish(
- exchange,
- testPlan.publishRoutingKeyGen().get(),
- testPlan.getPublishProperties(),
- RabbitMqTestUtils.generateRecord(i));
+ connection.close();
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+ /* ignored */
}
}
- });
- publisher.start();
- p.run();
- publisher.join();
- } finally {
- if (channel != null) {
- // channel may have already been closed automatically due to protocol
failure
- try {
- channel.close();
- } catch (Exception e) {
- /* ignored */
- }
- }
- if (connection != null) {
- // connection may have already been closed automatically due to
protocol failure
- try {
- connection.close();
- } catch (Exception e) {
- /* ignored */
- }
- }
- }
+ }
+ });
+ publisher.start();
+ waitForExchangeToBeDeclared.countDown();
Review comment:
I think this has to be:
```suggestion
waitForExchangeToBeDeclared.await();
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]