Repository: spark
Updated Branches:
  refs/heads/branch-2.1 bca7ce285 -> 6f715c01d


[SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race

## What changes were proposed in this pull request?

Synchronize access to openStreams map.

## How was this patch tested?

Existing tests.

Author: Bogdan Raducanu <bog...@databricks.com>

Closes #17592 from bogdanrdc/SPARK-20243.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f715c01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f715c01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f715c01

Branch: refs/heads/branch-2.1
Commit: 6f715c01dd09db52866fd93ff49eb206d157f8c3
Parents: bca7ce2
Author: Bogdan Raducanu <bog...@databricks.com>
Authored: Mon Apr 10 17:34:15 2017 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Fri Apr 14 15:49:02 2017 +0200

----------------------------------------------------------------------
 .../org/apache/spark/DebugFilesystem.scala      | 26 ++++++++++++--------
 1 file changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f715c01/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala 
b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
index 72aea84..91355f7 100644
--- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
+++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
@@ -20,7 +20,6 @@ package org.apache.spark
 import java.io.{FileDescriptor, InputStream}
 import java.lang
 import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging
 
 object DebugFilesystem extends Logging {
   // Stores the set of active streams and their creation sites.
-  private val openStreams = new ConcurrentHashMap[FSDataInputStream, 
Throwable]()
+  private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable]
 
-  def clearOpenStreams(): Unit = {
+  def addOpenStream(stream: FSDataInputStream): Unit = 
openStreams.synchronized {
+    openStreams.put(stream, new Throwable())
+  }
+
+  def clearOpenStreams(): Unit = openStreams.synchronized {
     openStreams.clear()
   }
 
-  def assertNoOpenStreams(): Unit = {
-    val numOpen = openStreams.size()
+  def removeOpenStream(stream: FSDataInputStream): Unit = 
openStreams.synchronized {
+    openStreams.remove(stream)
+  }
+
+  def assertNoOpenStreams(): Unit = openStreams.synchronized {
+    val numOpen = openStreams.values.size
     if (numOpen > 0) {
-      for (exc <- openStreams.values().asScala) {
+      for (exc <- openStreams.values) {
         logWarning("Leaked filesystem connection created at:")
         exc.printStackTrace()
       }
       throw new IllegalStateException(s"There are $numOpen possibly leaked 
file streams.",
-        openStreams.values().asScala.head)
+        openStreams.values.head)
     }
   }
 }
@@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem {
 
   override def open(f: Path, bufferSize: Int): FSDataInputStream = {
     val wrapped: FSDataInputStream = super.open(f, bufferSize)
-    openStreams.put(wrapped, new Throwable())
-
+    addOpenStream(wrapped)
     new FSDataInputStream(wrapped.getWrappedStream) {
       override def setDropBehind(dropBehind: lang.Boolean): Unit = 
wrapped.setDropBehind(dropBehind)
 
@@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem {
 
       override def close(): Unit = {
         wrapped.close()
-        openStreams.remove(wrapped)
+        removeOpenStream(wrapped)
       }
 
       override def read(): Int = wrapped.read()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to