zentol commented on code in PR #20229:
URL: https://github.com/apache/flink/pull/20229#discussion_r918012877


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java:
##########
@@ -18,26 +18,42 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeoutException;
+
 /**
  * A thread that additionally catches exceptions and offers a joining method 
that re-throws the
  * exceptions.
  *
- * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link 
Runnable}), one needs to
- * extends this class and implement the {@link #go()} method. That method may 
throw exceptions.
+ * <p>This class needs to supply a {@link RunnableWithException} that may 
throw exceptions or
+ * override {@link #go()} method.
+ *
+ * <p>you can use it as the same way of using threads like: {@code new 
Thread(Runnable runnable)} or
+ * {@Code new Thread()} and then override {@link Thread#run()} method. Just 
change it to {@code new
+ * CheckedThread(RunnableWithException runnableWithException)} or {@Code new 
CheckedThread()} and
+ * then override {@link CheckedThread#go()} method.
  *
- * <p>Exception from the {@link #go()} method are caught and re-thrown when 
joining this thread via
- * the {@link #sync()} method.
+ * <p>Exception from the {@link #runnable} or the override {@link #go()} are 
caught and re-thrown
+ * when joining this thread via the {@link #sync()} method.
  */
-public abstract class CheckedThread extends Thread {
+public class CheckedThread extends Thread {

Review Comment:
   The BlockingWriterThread case can be solved quite easily with a factory 
method, creating the latch before thread (with it being used directly for 
waiting) while passing the latch via the constructor for the await method.
   
   ```
       static BlockingWriterThread createBlockingWriterThread(
               LimitedConnectionsFileSystem fs,
               Path path,
               int maxConcurrentOutputStreams,
               int maxConcurrentStreamsTotal) {
           final OneShotLatch waiter = new OneShotLatch();
   
           return new BlockingThread(
                   () -> {
                       try (FSDataOutputStream stream = fs.create(path, 
WriteMode.OVERWRITE)) {
                           assertTrue(fs.getNumberOfOpenOutputStreams() <= 
maxConcurrentOutputStreams);
                           assertTrue(fs.getTotalNumberOfOpenStreams() <= 
maxConcurrentStreamsTotal);
   
                           final Random rnd = new Random();
                           final byte[] data = new byte[rnd.nextInt(10000) + 1];
                           rnd.nextBytes(data);
                           stream.write(data);
   
                           waiter.await();
   
                           // try to write one more thing, which might/should 
fail with an I/O
                           // exception
                           stream.write(rnd.nextInt());
                       }
                   },
                   waiter);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to