spark git commit: [SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race
Repository: spark Updated Branches: refs/heads/branch-2.1 bca7ce285 -> 6f715c01d [SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race ## What changes were proposed in this pull request? Synchronize access to openStreams map. ## How was this patch tested? Existing tests. Author: Bogdan Raducanu Closes #17592 from bogdanrdc/SPARK-20243. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f715c01 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f715c01 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f715c01 Branch: refs/heads/branch-2.1 Commit: 6f715c01dd09db52866fd93ff49eb206d157f8c3 Parents: bca7ce2 Author: Bogdan Raducanu Authored: Mon Apr 10 17:34:15 2017 +0200 Committer: Herman van Hovell Committed: Fri Apr 14 15:49:02 2017 +0200 -- .../org/apache/spark/DebugFilesystem.scala | 26 1 file changed, 16 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f715c01/core/src/test/scala/org/apache/spark/DebugFilesystem.scala -- diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 72aea84..91355f7 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io.{FileDescriptor, InputStream} import java.lang import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging object DebugFilesystem extends Logging { // Stores the set of active streams and their creation sites. - private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]() + private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable] - def clearOpenStreams(): Unit = { + def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { +openStreams.put(stream, new Throwable()) + } + + def clearOpenStreams(): Unit = openStreams.synchronized { openStreams.clear() } - def assertNoOpenStreams(): Unit = { -val numOpen = openStreams.size() + def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { +openStreams.remove(stream) + } + + def assertNoOpenStreams(): Unit = openStreams.synchronized { +val numOpen = openStreams.values.size if (numOpen > 0) { - for (exc <- openStreams.values().asScala) { + for (exc <- openStreams.values) { logWarning("Leaked filesystem connection created at:") exc.printStackTrace() } throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.", -openStreams.values().asScala.head) +openStreams.values.head) } } } @@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem { override def open(f: Path, bufferSize: Int): FSDataInputStream = { val wrapped: FSDataInputStream = super.open(f, bufferSize) -openStreams.put(wrapped, new Throwable()) - +addOpenStream(wrapped) new FSDataInputStream(wrapped.getWrappedStream) { override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind) @@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem { override def close(): Unit = { wrapped.close() -openStreams.remove(wrapped) +removeOpenStream(wrapped) } override def read(): Int = wrapped.read() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race
Repository: spark Updated Branches: refs/heads/master 3d7f201f2 -> 4f7d49b95 [SPARK-20243][TESTS] DebugFilesystem.assertNoOpenStreams thread race ## What changes were proposed in this pull request? Synchronize access to openStreams map. ## How was this patch tested? Existing tests. Author: Bogdan Raducanu Closes #17592 from bogdanrdc/SPARK-20243. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7d49b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7d49b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7d49b9 Branch: refs/heads/master Commit: 4f7d49b955b8c362da29a2540697240f4564d3ee Parents: 3d7f201 Author: Bogdan Raducanu Authored: Mon Apr 10 17:34:15 2017 +0200 Committer: Herman van Hovell Committed: Mon Apr 10 17:34:15 2017 +0200 -- .../org/apache/spark/DebugFilesystem.scala | 26 1 file changed, 16 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f7d49b9/core/src/test/scala/org/apache/spark/DebugFilesystem.scala -- diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 72aea84..91355f7 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io.{FileDescriptor, InputStream} import java.lang import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable @@ -31,21 +30,29 @@ import org.apache.spark.internal.Logging object DebugFilesystem extends Logging { // Stores the set of active streams and their creation sites. - private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]() + private val openStreams = mutable.Map.empty[FSDataInputStream, Throwable] - def clearOpenStreams(): Unit = { + def addOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { +openStreams.put(stream, new Throwable()) + } + + def clearOpenStreams(): Unit = openStreams.synchronized { openStreams.clear() } - def assertNoOpenStreams(): Unit = { -val numOpen = openStreams.size() + def removeOpenStream(stream: FSDataInputStream): Unit = openStreams.synchronized { +openStreams.remove(stream) + } + + def assertNoOpenStreams(): Unit = openStreams.synchronized { +val numOpen = openStreams.values.size if (numOpen > 0) { - for (exc <- openStreams.values().asScala) { + for (exc <- openStreams.values) { logWarning("Leaked filesystem connection created at:") exc.printStackTrace() } throw new IllegalStateException(s"There are $numOpen possibly leaked file streams.", -openStreams.values().asScala.head) +openStreams.values.head) } } } @@ -60,8 +67,7 @@ class DebugFilesystem extends LocalFileSystem { override def open(f: Path, bufferSize: Int): FSDataInputStream = { val wrapped: FSDataInputStream = super.open(f, bufferSize) -openStreams.put(wrapped, new Throwable()) - +addOpenStream(wrapped) new FSDataInputStream(wrapped.getWrappedStream) { override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind) @@ -98,7 +104,7 @@ class DebugFilesystem extends LocalFileSystem { override def close(): Unit = { wrapped.close() -openStreams.remove(wrapped) +removeOpenStream(wrapped) } override def read(): Int = wrapped.read() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org