Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553845 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + + "and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + + val hadoopConf = spark.sessionState.newHadoopConf() + + val partitionStr = "p=10000" + + case class TableCompressionConf(name: String, codeC: String) + + case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[TableCompressionConf]) { + def createTable(rootDir: File): Unit = { + val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") + sql( + s""" + |CREATE TABLE $tableName(a int) + |${if (isPartitioned) "PARTITIONED BY (p int)" else ""} + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else ""} + """.stripMargin) + } + + def insertOverwriteTable(): Unit = { + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${if (isPartitioned) s"partition ($partitionStr)" else ""} + |SELECT * from table_source + """.stripMargin) + } + } + + def getTableCompressionCodec(path: String, format: String): String = { + val codecs = format match { + case "parquet" => for { + footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) + block <- footer.getParquetMetadata.getBlocks.asScala + column <- block.getColumns.asScala + } yield column.getCodec.name() + case "orc" => new File(path).listFiles().filter{ file => + file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" + }.map { orcFile => + OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString + }.toSeq + } + + assert(codecs.distinct.length == 1) + codecs.head + } + + def checkCompressionCodecForTable(format: String, isPartitioned: Boolean, + compressionConf: Option[TableCompressionConf])(assertion: String => Unit): Unit = { + val table = TableDefine(s"tbl_$format${isPartitioned}", + isPartitioned, format, compressionConf) + withTempDir { tmpDir => + withTable(table.tableName) { + table.createTable(tmpDir) + table.insertOverwriteTable() + val partition = if (table.isPartitioned) partitionStr else "" + val path = s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition" + assertion(getTableCompressionCodec(path, table.format)) + } + } + } + + def getConvertMetastoreConfName(format: String): String = format match { + case "parquet" => "spark.sql.hive.convertMetastoreParquet" + case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + def getSparkCompressionConfName(format: String): String = format match { + case "parquet" => "spark.sql.parquet.compression.codec" + case "orc" => "spark.sql.orc.compression.codec" + } + + def checkTableCompressionCodecForCodecs(format: String, isPartitioned: Boolean, + convertMetastore: Boolean, compressionCodecs: List[String], + tableCompressionConf: List[TableCompressionConf]) --- End diff -- Could you update the indents for all of them in this PR? See the link: https://github.com/databricks/scala-style-guide#indent
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org