Repository: spark Updated Branches: refs/heads/master 2c8274568 -> fa09d9192
[SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration ## What changes were proposed in this pull request? In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session configuration will come into effect. Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage. ## How was this patch tested? Unit test Author: Gengliang Wang <gengliang.w...@databricks.com> Closes #21873 from gengliangwang/linterRule. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa09d919 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa09d919 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa09d919 Branch: refs/heads/master Commit: fa09d91925c07a58dea285d6cf85a751664f89ff Parents: 2c82745 Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Thu Jul 26 16:50:59 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Thu Jul 26 16:50:59 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/avro/AvroSuite.scala | 26 +++++--------------- .../org/apache/spark/ml/image/HadoopUtils.scala | 4 +++ .../apache/spark/ml/clustering/LDASuite.scala | 2 +- scalastyle-config.xml | 13 ++++++++++ .../HadoopFileLinesReaderSuite.scala | 22 ++++++++--------- .../spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 11 ++++++--- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 8 files changed, 45 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 865a145..a93309e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -638,12 +638,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { intercept[FileNotFoundException] { withTempPath { dir => FileUtils.touch(new File(dir, "test")) - val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration - try { - hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") { spark.read.format("avro").load(dir.toString) - } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) } } } @@ -717,15 +713,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Files.createFile(new File(tempSaveDir, "non-avro").toPath) - val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration - val count = try { - hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") { val newDf = spark.read.format("avro").load(tempSaveDir) - newDf.count() - } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + assert(newDf.count() == 8) } - assert(count == 8) } } @@ -888,20 +879,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Paths.get(new URL(episodesAvro).toURI), Paths.get(dir.getCanonicalPath, "episodes")) - val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration - val count = try { - hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true") + val hadoopConf = spark.sessionState.newHadoopConf() + withSQLConf(AvroFileFormat.IgnoreFilesWithoutExtensionProperty -> "true") { val newDf = spark .read .option("ignoreExtension", "true") .format("avro") .load(s"${dir.getCanonicalPath}/episodes") - newDf.count() - } finally { - hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) + assert(newDf.count() == 8) } - - assert(count == 8) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala index f1579ec..1fae1dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala @@ -38,7 +38,9 @@ private object RecursiveFlag { */ def withRecursiveFlag[T](value: Boolean, spark: SparkSession)(f: => T): T = { val flagName = FileInputFormat.INPUT_DIR_RECURSIVE + // scalastyle:off hadoopconfiguration val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration val old = Option(hadoopConf.get(flagName)) hadoopConf.set(flagName, value.toString) try f finally { @@ -98,7 +100,9 @@ private object SamplePathFilter { val sampleImages = sampleRatio < 1 if (sampleImages) { val flagName = FileInputFormat.PATHFILTER_CLASS + // scalastyle:off hadoopconfiguration val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration val old = Option(hadoopConf.getClass(flagName, null)) hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio) hadoopConf.setLong(SamplePathFilter.seedParam, seed) http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index db92132..bbd5408 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -285,7 +285,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest { // There should be 1 checkpoint remaining. assert(model.getCheckpointFiles.length === 1) val checkpointFile = new Path(model.getCheckpointFiles.head) - val fs = checkpointFile.getFileSystem(spark.sparkContext.hadoopConfiguration) + val fs = checkpointFile.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.exists(checkpointFile)) model.deleteCheckpointFiles() assert(model.getCheckpointFiles.isEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index e65e3aa..da5c3f2 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -150,6 +150,19 @@ This file is divided into 3 sections: // scalastyle:on println]]></customMessage> </check> + <check customId="hadoopconfiguration" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">spark(.sqlContext)?.sparkContext.hadoopConfiguration</parameter></parameters> + <customMessage><![CDATA[ + Are you sure that you want to use sparkContext.hadoopConfiguration? In most cases, you should use + spark.sessionState.newHadoopConf() instead, so that the hadoop configurations specified in Spark session + configuration will come into effect. + If you must use sparkContext.hadoopConfiguration, wrap the code block with + // scalastyle:off hadoopconfiguration + spark.sparkContext.hadoopConfiguration... + // scalastyle:on hadoopconfiguration + ]]></customMessage> + </check> + <check customId="visiblefortesting" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">@VisibleForTesting</parameter></parameters> <customMessage><![CDATA[ http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala index a39a25b..508614a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala @@ -38,7 +38,7 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext { val lines = ranges.map { case (start, length) => val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length) - val hadoopConf = conf.getOrElse(spark.sparkContext.hadoopConfiguration) + val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf()) val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf) reader.map(_.toString) @@ -111,20 +111,20 @@ class HadoopFileLinesReaderSuite extends SharedSQLContext { } test("io.file.buffer.size is less than line length") { - val conf = spark.sparkContext.hadoopConfiguration - conf.set("io.file.buffer.size", "2") - withTempPath { path => - val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5))) - assert(lines == Seq("123456")) + withSQLConf("io.file.buffer.size" -> "2") { + withTempPath { path => + val lines = getLines(path, text = "abcdef\n123456", ranges = Seq((4, 4), (8, 5))) + assert(lines == Seq("123456")) + } } } test("line cannot be longer than line.maxlength") { - val conf = spark.sparkContext.hadoopConfiguration - conf.set("mapreduce.input.linerecordreader.line.maxlength", "5") - withTempPath { path => - val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15))) - assert(lines == Seq("1234")) + withSQLConf("mapreduce.input.linerecordreader.line.maxlength" -> "5") { + withTempPath { path => + val lines = getLines(path, text = "abcdef\n1234", ranges = Seq((0, 15))) + assert(lines == Seq("1234")) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0b3de3d..7288177 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -783,7 +783,7 @@ class HiveDDLSuite val part1 = Map("a" -> "1", "b" -> "5") val part2 = Map("a" -> "2", "b" -> "6") val root = new Path(catalog.getTableMetadata(tableIdent).location) - val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + val fs = root.getFileSystem(spark.sessionState.newHadoopConf()) // valid fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2ea5179..741b012 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1177,13 +1177,18 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2)) } - val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict") + // Turn off style check since the following test is to modify hadoop configuration on purpose. + // scalastyle:off hadoopconfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + + val originalValue = hadoopConf.get(modeConfKey, "nonstrict") try { - spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict") + hadoopConf.set(modeConfKey, "nonstrict") sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4") assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4)) } finally { - spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue) + hadoopConf.set(modeConfKey, originalValue) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/fa09d919/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 828c18a..1a91682 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2053,7 +2053,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") deleteOnExitField.setAccessible(true) - val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org