lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r998671565
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -36,6 +37,9 @@
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
+import java.util.concurrent.Executors;
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +560,108 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout() throws IOException {
+ Duration closeTimeout = Duration.millis(2000L);
+ long waitTimeout = closeTimeout.getMillis() + 1000L;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+ ExecutorOptions options = createExecutorOptions();
+
+ JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+
+ reader.start();
+ reader.close();
+
+ assertFalse(getDiscardedValue(reader));
+ try {
+ options.getScheduledExecutorService().awaitTermination(waitTimeout,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ }
+ assertTrue(getDiscardedValue(reader));
+ }
+
+ private ExecutorOptions createExecutorOptions() {
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ ExecutorOptions options =
PipelineOptionsFactory.create().as(ExecutorOptions.class);
+ options.setScheduledExecutorService(executorService);
+ return options;
+ }
Review Comment:
```suggestion
ScheduledExecutorService mockScheduledExecutorService =
Mockito.mock(ScheduledExecutorService.class);
ExecutorOptions options =
PipelineOptionsFactory.as(ExecutorOptions.class);
options.setScheduledExecutorService(mockScheduledExecutorService);
ArgumentCaptor<Runnable> runnableArgumentCaptor =
ArgumentCaptor.forClass(Runnable.class);
when(mockScheduledExecutorService.schedule(
runnableArgumentCaptor.capture(), anyLong(),
any(TimeUnit.class)))
.thenReturn(null /* unused */);
JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
reader.start();
assertFalse(getDiscardedValue(reader));
reader.close();
assertFalse(getDiscardedValue(reader));
verify(mockScheduledExecutorService)
.schedule(any(Runnable.class), eq(closeTimeout.getMillis()),
eq(TimeUnit.MILLISECONDS));
runnableArgumentCaptor.getValue().run();
assertTrue(getDiscardedValue(reader));
verifyNoMoreInteractions(mockScheduledExecutorService);
}
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -67,6 +72,7 @@
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Review Comment:
```suggestion
import org.junit.Rule;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +560,108 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout() throws IOException {
+ Duration closeTimeout = Duration.millis(2000L);
+ long waitTimeout = closeTimeout.getMillis() + 1000L;
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Review Comment:
```suggestion
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verifyNoMoreInteractions;
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -92,7 +98,7 @@ public class JmsIOTest {
private ConnectionFactory connectionFactory;
private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;
- @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public final transient TestPipeline pipeline =
TestPipeline.fromOptions();
Review Comment:
```suggestion
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]