Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19218#discussion_r144187454
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
    @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
           assert(e.contains("mismatched input 'ROW'"))
         }
       }
    +
    +  private def getConvertMetastoreConfName(format: String): String = format 
match {
    +    case "parquet" => "spark.sql.hive.convertMetastoreParquet"
    +    case "orc" => "spark.sql.hive.convertMetastoreOrc"
    +  }
    +
    +  private def getSparkCompressionConfName(format: String): String = format 
match {
    +    case "parquet" => "spark.sql.parquet.compression.codec"
    +    case "orc" => "spark.sql.orc.compression.codec"
    +  }
    +
    +  private def getTableCompressPropName(format: String): String = {
    +    format.toLowerCase match {
    +      case "parquet" => "parquet.compression"
    +      case "orc" => "orc.compress"
    +    }
    +  }
    +
    +  private def getTableCompressionCodec(path: String, format: String): 
String = {
    +    val hadoopConf = spark.sessionState.newHadoopConf()
    +    val codecs = format match {
    +      case "parquet" => for {
    +        footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
    +        block <- footer.getParquetMetadata.getBlocks.asScala
    +        column <- block.getColumns.asScala
    +      } yield column.getCodec.name()
    +      case "orc" => new File(path).listFiles().filter{ file =>
    +        file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
    +      }.map { orcFile =>
    +        
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
    +      }.toSeq
    +    }
    +
    +    assert(codecs.distinct.length == 1)
    +    codecs.head
    +  }
    +
    +  private def writeDataToTable(
    +      rootDir: File,
    +      tableName: String,
    +      isPartitioned: Boolean,
    +      format: String,
    +      compressionCodec: Option[String]) {
    +    val tblProperties = compressionCodec match {
    +      case Some(prop) => 
s"TBLPROPERTIES('${getTableCompressPropName(format)}'='$prop')"
    +      case _ => ""
    +    }
    +    val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else 
""
    +    sql(
    +      s"""
    +         |CREATE TABLE $tableName(a int)
    +         |$partitionCreate
    +         |STORED AS $format
    +         |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
    +         |$tblProperties
    +       """.stripMargin)
    +
    +    val partitionInsert = if (isPartitioned) s"partition (p=10000)" else ""
    +    sql(
    +      s"""
    +         |INSERT OVERWRITE TABLE $tableName
    +         |$partitionInsert
    +         |SELECT * from table_source
    --- End diff --
    
    nit. `from` -> `FROM`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to