This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new db3e746 [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource db3e746 is described below commit db3e746b64a3f78ce60bcfd6f372735f574da95a Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Thu May 30 19:54:32 2019 +0900 [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource ## What changes were proposed in this pull request? This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak. ## How was this patch tested? Existing test Closes #24739 from wangyum/SPARK-27875. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../spark/api/python/PythonBroadcastSuite.scala | 6 ++--- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 28 +++++++++---------- .../deploy/history/FsHistoryProviderSuite.scala | 10 +++---- .../spark/metrics/InputOutputMetricsSuite.scala | 12 ++++----- .../spark/scheduler/ReplayListenerSuite.scala | 26 +++++++++--------- .../ml/param/shared/SharedParamsCodeGen.scala | 8 +++--- .../features/PodTemplateConfigMapStepSuite.scala | 7 ++--- .../apache/spark/sql/catalyst/util/package.scala | 13 ++++----- .../spark/sql/execution/command/DDLSuite.scala | 12 ++++----- .../apache/spark/sql/hive/StatisticsSuite.scala | 31 +++++++++++----------- .../spark/sql/hive/client/VersionsSuite.scala | 6 ++--- .../spark/sql/hive/execution/HiveUDFSuite.scala | 12 ++++----- 12 files changed, 88 insertions(+), 83 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala index 24004de..dffdd96 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala @@ -42,9 +42,9 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC withTempDir { tempDir => val broadcastDataFile: File = { val file = new File(tempDir, "broadcastData") - val printWriter = new PrintWriter(file) - printWriter.write(broadcastedString) - printWriter.close() + Utils.tryWithResource(new PrintWriter(file)) { printWriter => + printWriter.write(broadcastedString) + } file } val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 65c9cb9..385f549 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -749,10 +749,10 @@ class SparkSubmitSuite withTempDir { tmpDir => // Test jars and files val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir) - val writer1 = new PrintWriter(f1) - writer1.println("spark.jars " + jars) - writer1.println("spark.files " + files) - writer1.close() + Utils.tryWithResource(new PrintWriter(f1)) { writer => + writer.println("spark.jars " + jars) + writer.println("spark.files " + files) + } val clArgs = Seq( "--master", "local", "--class", "org.SomeClass", @@ -766,10 +766,10 @@ class SparkSubmitSuite // Test files and archives (Yarn) val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir) - val writer2 = new PrintWriter(f2) - writer2.println("spark.yarn.dist.files " + files) - writer2.println("spark.yarn.dist.archives " + archives) - writer2.close() + Utils.tryWithResource(new PrintWriter(f2)) { writer => + writer.println("spark.yarn.dist.files " + files) + writer.println("spark.yarn.dist.archives " + archives) + } val clArgs2 = Seq( "--master", "yarn", "--class", "org.SomeClass", @@ -783,9 +783,9 @@ class SparkSubmitSuite // Test python files val f3 = File.createTempFile("test-submit-python-files", "", tmpDir) - val writer3 = new PrintWriter(f3) - writer3.println("spark.submit.pyFiles " + pyFiles) - writer3.close() + Utils.tryWithResource(new PrintWriter(f3)) { writer => + writer.println("spark.submit.pyFiles " + pyFiles) + } val clArgs3 = Seq( "--master", "local", "--properties-file", f3.getPath, @@ -802,10 +802,10 @@ class SparkSubmitSuite val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir) val pyFile1 = File.createTempFile("file1", ".py", tmpDir) val pyFile2 = File.createTempFile("file2", ".py", tmpDir) - val writer4 = new PrintWriter(f4) val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}" - writer4.println("spark.submit.pyFiles " + remotePyFiles) - writer4.close() + Utils.tryWithResource(new PrintWriter(f4)) { writer => + writer.println("spark.submit.pyFiles " + remotePyFiles) + } val clArgs4 = Seq( "--master", "yarn", "--deploy-mode", "cluster", diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 86575b1..791814b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -770,11 +770,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // write out one totally bogus hidden file val hiddenGarbageFile = new File(testDir, ".garbage") - val out = new PrintWriter(hiddenGarbageFile) - // scalastyle:off println - out.println("GARBAGE") - // scalastyle:on println - out.close() + Utils.tryWithResource(new PrintWriter(hiddenGarbageFile)) { out => + // scalastyle:off println + out.println("GARBAGE") + // scalastyle:on println + } // also write out one real event log file, but since its a hidden file, we shouldn't read it val tmpNewAppFile = newLogFile("hidden", None, inProgress = false) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 6f4203d..c7bd0c9 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -51,13 +51,13 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext testTempDir.mkdir() tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt") - val pw = new PrintWriter(new FileWriter(tmpFile)) - for (x <- 1 to numRecords) { - // scalastyle:off println - pw.println(RandomUtils.nextInt(0, numBuckets)) - // scalastyle:on println + Utils.tryWithResource(new PrintWriter(tmpFile)) { pw => + for (x <- 1 to numRecords) { + // scalastyle:off println + pw.println(RandomUtils.nextInt(0, numBuckets)) + // scalastyle:on println + } } - pw.close() // Path to tmpFile tmpFilePath = tmpFile.toURI.toString diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d1113c7..7d0712b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -50,15 +50,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Simple replay") { val logFilePath = getFilePath(testDir, "events.txt") val fstream = fileSystem.create(logFilePath) - val writer = new PrintWriter(fstream) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) - // scalastyle:off println - writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) - writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) - // scalastyle:on println - writer.close() + Utils.tryWithResource(new PrintWriter(fstream)) { writer => + // scalastyle:off println + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + // scalastyle:on println + } val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) @@ -132,16 +132,16 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Replay incompatible event log") { val logFilePath = getFilePath(testDir, "incompatible.txt") val fstream = fileSystem.create(logFilePath) - val writer = new PrintWriter(fstream) val applicationStart = SparkListenerApplicationStart("Incompatible App", None, 125L, "UserUsingIncompatibleVersion", None) val applicationEnd = SparkListenerApplicationEnd(1000L) - // scalastyle:off println - writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) - writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""") - writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) - // scalastyle:on println - writer.close() + Utils.tryWithResource(new PrintWriter(fstream)) { writer => + // scalastyle:off println + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""") + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + // scalastyle:on println + } val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 7e08675..1afcf1b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -22,6 +22,8 @@ import java.io.PrintWriter import scala.reflect.ClassTag import scala.xml.Utility +import org.apache.spark.util.Utils + /** * Code generator for shared params (sharedParams.scala). Run under the Spark folder with * {{{ @@ -103,9 +105,9 @@ private[shared] object SharedParamsCodeGen { val code = genSharedParams(params) val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala" - val writer = new PrintWriter(file) - writer.write(code) - writer.close() + Utils.tryWithResource(new PrintWriter(file)) { writer => + writer.write(code) + } } /** Description of a param. */ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index 5e7388d..051320f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.util.Utils class PodTemplateConfigMapStepSuite extends SparkFunSuite { @@ -46,9 +47,9 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite { .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath) val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) - val writer = new PrintWriter(templateFile) - writer.write("pod-template-contents") - writer.close() + Utils.tryWithResource(new PrintWriter(templateFile)) { writer => + writer.write("pod-template-contents") + } val step = new PodTemplateConfigMapStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 12e8d02..eefabbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -93,9 +93,9 @@ package object util extends Logging { } def stringToFile(file: File, str: String): File = { - val out = new PrintWriter(file) - out.write(str) - out.close() + Utils.tryWithResource(new PrintWriter(file)) { out => + out.write(str) + } file } @@ -115,9 +115,10 @@ package object util extends Logging { def stackTraceToString(t: Throwable): String = { val out = new java.io.ByteArrayOutputStream - val writer = new PrintWriter(out) - t.printStackTrace(writer) - writer.flush() + Utils.tryWithResource(new PrintWriter(out)) { writer => + t.printStackTrace(writer) + writer.flush() + } new String(out.toByteArray, StandardCharsets.UTF_8) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4859bde..0124f28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2720,14 +2720,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { test("Refresh table before drop database cascade") { withTempDir { tempDir => val file1 = new File(tempDir + "/first.csv") - val writer1 = new PrintWriter(file1) - writer1.write("first") - writer1.close() + Utils.tryWithResource(new PrintWriter(file1)) { writer => + writer.write("first") + } val file2 = new File(tempDir + "/second.csv") - val writer2 = new PrintWriter(file2) - writer2.write("second") - writer2.close() + Utils.tryWithResource(new PrintWriter(file2)) { writer => + writer.write("second") + } withDatabase("foo") { withTable("foo.first") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 44b1362..483bd37 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { @@ -77,14 +78,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTempDir { tempDir => // EXTERNAL OpenCSVSerde table pointing to LOCATION val file1 = new File(tempDir + "/data1") - val writer1 = new PrintWriter(file1) - writer1.write("1,2") - writer1.close() + Utils.tryWithResource(new PrintWriter(file1)) { writer => + writer.write("1,2") + } val file2 = new File(tempDir + "/data2") - val writer2 = new PrintWriter(file2) - writer2.write("1,2") - writer2.close() + Utils.tryWithResource(new PrintWriter(file2)) { writer => + writer.write("1,2") + } sql( s""" @@ -957,9 +958,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTempDir { loadPath => // load data command val file = new File(loadPath + "/data") - val writer = new PrintWriter(file) - writer.write("2,xyz") - writer.close() + Utils.tryWithResource(new PrintWriter(file)) { writer => + writer.write("2,xyz") + } sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table") if (autoUpdate) { val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) @@ -994,14 +995,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => val file1 = new File(dir1 + "/data") - val writer1 = new PrintWriter(file1) - writer1.write("1,a") - writer1.close() + Utils.tryWithResource(new PrintWriter(file1)) { writer => + writer.write("1,a") + } val file2 = new File(dir2 + "/data") - val writer2 = new PrintWriter(file2) - writer2.write("1,a") - writer2.close() + Utils.tryWithResource(new PrintWriter(file2)) { writer => + writer.write("1,a") + } // add partition command sql( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 3284579..9861a0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -949,9 +949,9 @@ class VersionsSuite extends SparkFunSuite with Logging { |} """.stripMargin val schemaFile = new File(dir, "avroDecimal.avsc") - val writer = new PrintWriter(schemaFile) - writer.write(avroSchema) - writer.close() + Utils.tryWithResource(new PrintWriter(schemaFile)) { writer => + writer.write(avroSchema) + } val schemaPath = schemaFile.toURI.toString val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index a6fc744..446267d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -454,14 +454,14 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { // EXTERNAL OpenCSVSerde table pointing to LOCATION val file1 = new File(tempDir + "/data1") - val writer1 = new PrintWriter(file1) - writer1.write("1,2") - writer1.close() + Utils.tryWithResource(new PrintWriter(file1)) { writer => + writer.write("1,2") + } val file2 = new File(tempDir + "/data2") - val writer2 = new PrintWriter(file2) - writer2.write("1,2") - writer2.close() + Utils.tryWithResource(new PrintWriter(file2)) { writer => + writer.write("1,2") + } sql( s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org