SmirAlex commented on code in PR #20919: URL: https://github.com/apache/flink/pull/20919#discussion_r1066988928
########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java: ########## @@ -77,43 +77,46 @@ void checkCounter() { @ParameterizedTest @MethodSource("deltaNumSplits") void testReadWithDifferentSplits(int deltaNumSplits) throws Exception { - InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits); - cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup()); - cacheLoader.run(); - ConcurrentHashMap<RowData, Collection<RowData>> cache = cacheLoader.getCache(); - assertCacheContent(cache); - cacheLoader.run(); - assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance of cache after reload - cacheLoader.close(); - assertThat(cacheLoader.getCache().size()).isZero(); // cache is cleared after close + try (InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits)) { + cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup()); + run(cacheLoader); + ConcurrentHashMap<RowData, Collection<RowData>> cache = cacheLoader.getCache(); + assertCacheContent(cache); + run(cacheLoader); + // new instance of cache after reload + assertThat(cacheLoader.getCache()).isNotSameAs(cache); Review Comment: Thanks, fixed ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java: ########## @@ -77,43 +77,46 @@ void checkCounter() { @ParameterizedTest @MethodSource("deltaNumSplits") void testReadWithDifferentSplits(int deltaNumSplits) throws Exception { - InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits); - cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup()); - cacheLoader.run(); - ConcurrentHashMap<RowData, Collection<RowData>> cache = cacheLoader.getCache(); - assertCacheContent(cache); - cacheLoader.run(); - assertThat(cacheLoader.getCache()).isNotSameAs(cache); // new instance of cache after reload - cacheLoader.close(); - assertThat(cacheLoader.getCache().size()).isZero(); // cache is cleared after close + try (InputFormatCacheLoader cacheLoader = createCacheLoader(deltaNumSplits)) { + cacheLoader.initializeMetrics(UnregisteredMetricsGroup.createCacheMetricGroup()); + run(cacheLoader); + ConcurrentHashMap<RowData, Collection<RowData>> cache = cacheLoader.getCache(); + assertCacheContent(cache); + run(cacheLoader); + // new instance of cache after reload + assertThat(cacheLoader.getCache()).isNotSameAs(cache); + cacheLoader.close(); Review Comment: Fixed ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/table/fullcache/inputformat/InputFormatCacheLoaderTest.java: ########## @@ -123,46 +126,47 @@ void testExceptionDuringReload() throws Exception { () -> { throw exception; }; - InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction); - InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup(); - cacheLoader.open(metricGroup); - assertThatThrownBy(cacheLoader::run).hasRootCause(exception); - assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1); + try (InputFormatCacheLoader cacheLoader = + createCacheLoader(DEFAULT_NUM_SPLITS, DEFAULT_DELTA_NUM_SPLITS, reloadAction)) { + InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup(); + cacheLoader.initializeMetrics(metricGroup); + assertThatThrownBy(() -> run(cacheLoader)).hasRootCause(exception); + assertThat(metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1); + } } - @Test - void testCloseAndInterruptDuringReload() throws Exception { - AtomicInteger sleepCounter = new AtomicInteger(0); - int totalSleepCount = TestCacheLoader.DATA.size() + 1; // equals to number of all rows + /** + * Cache loader creates additional threads in case of multiple input splits. In both cases cache + * loader must correctly react on close and interrupt all threads. + */ + @ParameterizedTest + @MethodSource("numSplits") + void testCloseDuringReload(int numSplits) throws Exception { + OneShotLatch reloadLatch = new OneShotLatch(); Runnable reloadAction = - ThrowingRunnable.unchecked( - () -> { - sleepCounter.incrementAndGet(); - Thread.sleep(1000); - }); - InputFormatCacheLoader cacheLoader = createCacheLoader(0, reloadAction); + () -> { + reloadLatch.trigger(); + assertThatThrownBy(() -> new OneShotLatch().await()) + .as("Wait should be interrupted if everything works ok") + .isInstanceOf(InterruptedException.class); + Thread.currentThread().interrupt(); // restore interrupted status + }; InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup(); - cacheLoader.open(metricGroup); - - // check interruption - ExecutorService executorService = Executors.newSingleThreadExecutor(); - Future<?> future = executorService.submit(cacheLoader); - executorService.shutdownNow(); // internally interrupts a thread - assertThatNoException().isThrownBy(future::get); // wait for the end - // check that we didn't process all elements, but reacted on interruption - assertThat(sleepCounter).hasValueLessThan(totalSleepCount); + CompletableFuture<Void> future; + try (InputFormatCacheLoader cacheLoader = + createCacheLoader(numSplits, DEFAULT_DELTA_NUM_SPLITS, reloadAction)) { + cacheLoader.initializeMetrics(metricGroup); + future = cacheLoader.reloadAsync(); + reloadLatch.await(5000, TimeUnit.MILLISECONDS); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org