zentol commented on code in PR #20229: URL: https://github.com/apache/flink/pull/20229#discussion_r918012877
########## flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java: ########## @@ -18,26 +18,42 @@ package org.apache.flink.core.testutils; +import java.util.concurrent.TimeoutException; + /** * A thread that additionally catches exceptions and offers a joining method that re-throws the * exceptions. * - * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link Runnable}), one needs to - * extends this class and implement the {@link #go()} method. That method may throw exceptions. + * <p>This class needs to supply a {@link RunnableWithException} that may throw exceptions or + * override {@link #go()} method. + * + * <p>you can use it as the same way of using threads like: {@code new Thread(Runnable runnable)} or + * {@Code new Thread()} and then override {@link Thread#run()} method. Just change it to {@code new + * CheckedThread(RunnableWithException runnableWithException)} or {@Code new CheckedThread()} and + * then override {@link CheckedThread#go()} method. * - * <p>Exception from the {@link #go()} method are caught and re-thrown when joining this thread via - * the {@link #sync()} method. + * <p>Exception from the {@link #runnable} or the override {@link #go()} are caught and re-thrown + * when joining this thread via the {@link #sync()} method. */ -public abstract class CheckedThread extends Thread { +public class CheckedThread extends Thread { Review Comment: The BlockingWriterThread case can be solved quite easily with a factory method, creating the latch before thread (with it being used directly for waiting) while passing the latch via the constructor for the await method. ``` static BlockingWriterThread createBlockingWriterThread( LimitedConnectionsFileSystem fs, Path path, int maxConcurrentOutputStreams, int maxConcurrentStreamsTotal) { final OneShotLatch waiter = new OneShotLatch(); return new BlockingThread( () -> { try (FSDataOutputStream stream = fs.create(path, WriteMode.OVERWRITE)) { assertTrue(fs.getNumberOfOpenOutputStreams() <= maxConcurrentOutputStreams); assertTrue(fs.getTotalNumberOfOpenStreams() <= maxConcurrentStreamsTotal); final Random rnd = new Random(); final byte[] data = new byte[rnd.nextInt(10000) + 1]; rnd.nextBytes(data); stream.write(data); waiter.await(); // try to write one more thing, which might/should fail with an I/O // exception stream.write(rnd.nextInt()); } }, waiter); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org