SmirAlex commented on code in PR #20919:
URL: https://github.com/apache/flink/pull/20919#discussion_r1066988928


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -77,43 +77,46 @@ void checkCounter() {
     @ParameterizedTest
     @MethodSource("deltaNumSplits")
     void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
-        InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
-        cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
-        cacheLoader.run();
-        ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
-        assertCacheContent(cache);
-        cacheLoader.run();
-        assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance 
of cache after reload
-        cacheLoader.close();
-        assertThat(cacheLoader.getCache().size()).isZero(); // cache is 
cleared after close
+        try (InputFormatCacheLoader cacheLoader = 
createCacheLoader(deltaNumSplits)) {
+            
cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
+            run(cacheLoader);
+            ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
+            assertCacheContent(cache);
+            run(cacheLoader);
+            // new instance of cache after reload
+            assertThat(cacheLoader.getCache()).isNotSameAs(cache);

Review Comment:
   Thanks, fixed



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -77,43 +77,46 @@ void checkCounter() {
     @ParameterizedTest
     @MethodSource("deltaNumSplits")
     void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
-        InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits);
-        cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
-        cacheLoader.run();
-        ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
-        assertCacheContent(cache);
-        cacheLoader.run();
-        assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance 
of cache after reload
-        cacheLoader.close();
-        assertThat(cacheLoader.getCache().size()).isZero(); // cache is 
cleared after close
+        try (InputFormatCacheLoader cacheLoader = 
createCacheLoader(deltaNumSplits)) {
+            
cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup());
+            run(cacheLoader);
+            ConcurrentHashMap<RowData, Collection<RowData>> cache = 
cacheLoader.getCache();
+            assertCacheContent(cache);
+            run(cacheLoader);
+            // new instance of cache after reload
+            assertThat(cacheLoader.getCache()).isNotSameAs(cache);
+            cacheLoader.close();

Review Comment:
   Fixed



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java:
##########
@@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception {
                 () -> {
                     throw exception;
                 };
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
-        InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-        assertThatThrownBy(cacheLoader::run).hasRootCause(exception);
-        assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(DEFAULT_NUM_SPLITS, 
DEFAULT_DELTA_NUM_SPLITS, reloadAction)) {
+            InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
+            cacheLoader.initializeMetrics(metricGroup);
+            assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception);
+            
assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1);
+        }
     }
 
-    @Test
-    void testCloseAndInterruptDuringReload() throws Exception {
-        AtomicInteger sleepCounter = new AtomicInteger(0);
-        int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to 
number of all rows
+    /**
+     * Cache loader creates additional threads in case of multiple input 
splits. In both cases cache
+     * loader must correctly react on close and interrupt all threads.
+     */
+    @ParameterizedTest
+    @MethodSource("numSplits")
+    void testCloseDuringReload(int numSplits) throws Exception {
+        OneShotLatch reloadLatch = new OneShotLatch();
         Runnable reloadAction =
-                ThrowingRunnable.unchecked(
-                        () -> {
-                            sleepCounter.incrementAndGet();
-                            Thread.sleep(1000);
-                        });
-        InputFormatCacheLoader cacheLoader = createCacheLoader(0, 
reloadAction);
+                () -> {
+                    reloadLatch.trigger();
+                    assertThatThrownBy(() -> new OneShotLatch().await())
+                            .as("Wait should be interrupted if everything 
works ok")
+                            .isInstanceOf(InterruptedException.class);
+                    Thread.currentThread().interrupt(); // restore interrupted 
status
+                };
         InterceptingCacheMetricGroup metricGroup = new 
InterceptingCacheMetricGroup();
-        cacheLoader.open(metricGroup);
-
-        // check interruption
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(cacheLoader);
-        executorService.shutdownNow(); // internally interrupts a thread
-        assertThatNoException().isThrownBy(future::get); // wait for the end
-        // check that we didn't process all elements, but reacted on 
interruption
-        assertThat(sleepCounter).hasValueLessThan(totalSleepCount);
+        CompletableFuture<Void> future;
+        try (InputFormatCacheLoader cacheLoader =
+                createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, 
reloadAction)) {
+            cacheLoader.initializeMetrics(metricGroup);
+            future = cacheLoader.reloadAsync();
+            reloadLatch.await(5000, TimeUnit.MILLISECONDS);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to