Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19218#discussion_r142553845
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
    @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
           assert(e.contains("mismatched input 'ROW'"))
         }
       }
    +
    +  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
    +    "and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
    +
    +    val hadoopConf = spark.sessionState.newHadoopConf()
    +
    +    val partitionStr = "p=10000"
    +
    +    case class TableCompressionConf(name: String, codeC: String)
    +
    +    case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
    +      compressionConf: Option[TableCompressionConf]) {
    +      def createTable(rootDir: File): Unit = {
    +        val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
    +        sql(
    +          s"""
    +          |CREATE TABLE $tableName(a int)
    +          |${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
    +          |STORED AS $format
    +          |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
    +          |${if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else ""}
    +        """.stripMargin)
    +      }
    +
    +      def insertOverwriteTable(): Unit = {
    +        sql(
    +          s"""
    +          |INSERT OVERWRITE TABLE $tableName
    +          |${if (isPartitioned) s"partition ($partitionStr)" else ""}
    +          |SELECT * from table_source
    +        """.stripMargin)
    +      }
    +    }
    +
    +    def getTableCompressionCodec(path: String, format: String): String = {
    +      val codecs = format match {
    +        case "parquet" => for {
    +          footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
    +          block <- footer.getParquetMetadata.getBlocks.asScala
    +          column <- block.getColumns.asScala
    +        } yield column.getCodec.name()
    +        case "orc" => new File(path).listFiles().filter{ file =>
    +          file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
    +        }.map { orcFile =>
    +            
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
    +        }.toSeq
    +      }
    +
    +      assert(codecs.distinct.length == 1)
    +      codecs.head
    +    }
    +
    +    def checkCompressionCodecForTable(format: String, isPartitioned: 
Boolean,
    +      compressionConf: Option[TableCompressionConf])(assertion: String => 
Unit): Unit = {
    +      val table = TableDefine(s"tbl_$format${isPartitioned}",
    +        isPartitioned, format, compressionConf)
    +      withTempDir { tmpDir =>
    +        withTable(table.tableName) {
    +          table.createTable(tmpDir)
    +          table.insertOverwriteTable()
    +          val partition = if (table.isPartitioned) partitionStr else ""
    +          val path = 
s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition"
    +          assertion(getTableCompressionCodec(path, table.format))
    +        }
    +      }
    +    }
    +
    +    def getConvertMetastoreConfName(format: String): String = format match 
{
    +      case "parquet" => "spark.sql.hive.convertMetastoreParquet"
    +      case "orc" => "spark.sql.hive.convertMetastoreOrc"
    +    }
    +
    +    def getSparkCompressionConfName(format: String): String = format match 
{
    +      case "parquet" => "spark.sql.parquet.compression.codec"
    +      case "orc" => "spark.sql.orc.compression.codec"
    +    }
    +
    +    def checkTableCompressionCodecForCodecs(format: String, isPartitioned: 
Boolean,
    +      convertMetastore: Boolean, compressionCodecs: List[String],
    +      tableCompressionConf: List[TableCompressionConf])
    --- End diff --
    
    Could you update the indents for all of them in this PR? See the link: 
https://github.com/databricks/scala-style-guide#indent


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to