This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b2b8323ccd9 [FLINK-33641][test] Suppress the 
DirectoryNotEmptyException in StreamingWithStateTestBase to prevent test 
failures (#23914)
b2b8323ccd9 is described below

commit b2b8323ccd931be85dbaa6542552c26e6a29a105
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Thu Dec 14 17:30:18 2023 +0800

    [FLINK-33641][test] Suppress the DirectoryNotEmptyException in 
StreamingWithStateTestBase to prevent test failures (#23914)
---
 .../runtime/utils/StreamingWithStateTestBase.scala | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
index 249d42dc197..0c9a29f061e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala
@@ -27,9 +27,9 @@ import org.apache.flink.streaming.api.CheckpointingMode
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
-import org.apache.flink.table.data.{RowData, StringData}
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.writer.BinaryRowWriter
+import org.apache.flink.table.data.{RowData, StringData}
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
@@ -37,13 +37,13 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.RowType
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters
 import org.apache.flink.testutils.junit.utils.TempDirUtils
-
+import org.apache.flink.util.FileUtils
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.slf4j.LoggerFactory
 
-import java.io.File
+import java.io.{File, IOException}
 import java.util
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -55,6 +55,8 @@ class StreamingWithStateTestBase(state: StateBackendMode) 
extends StreamingTestB
     case ROCKSDB_BACKEND => true
   }
 
+  private val log = 
LoggerFactory.getLogger(classOf[StreamingWithStateTestBase])
+
   private val classLoader = Thread.currentThread.getContextClassLoader
 
   var baseCheckpointPath: File = _
@@ -63,7 +65,9 @@ class StreamingWithStateTestBase(state: StateBackendMode) 
extends StreamingTestB
   override def before(): Unit = {
     super.before()
     // set state backend
-    baseCheckpointPath = tempFolder.toFile
+
+    // subfolder are managed here because the tests could fail during cleanup 
when concurrently executed (see FLINK-33820)
+    baseCheckpointPath = TempDirUtils.newFolder(tempFolder)
     state match {
       case HEAP_BACKEND =>
         val conf = new Configuration()
@@ -82,6 +86,19 @@ class StreamingWithStateTestBase(state: StateBackendMode) 
extends StreamingTestB
   @AfterEach
   override def after(): Unit = {
     super.after()
+    try {
+      FileUtils.deleteDirectory(baseCheckpointPath)
+    } catch {
+      case e: IOException =>
+        if (baseCheckpointPath.exists) {
+          log.error(
+            s"The temporary files are not being deleted gracefully, remaining 
files " +
+              s"${FileUtils.listFilesInDirectory(baseCheckpointPath.toPath, _ 
=> true)}.",
+            e)
+        } else {
+          log.error("The temporary files are not being deleted gracefully.", e)
+        }
+    }
     assertThat(FailingCollectionSource.failedBefore).isTrue
   }
 

Reply via email to