Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20937#discussion_r178427011
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
    @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
         }
       }
     
    -  def testLineSeparator(lineSep: String): Unit = {
    -    test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
    -      // Read
    -      val data =
    -        s"""
    -          |  {"f":
    -          |"a", "f0": 1}$lineSep{"f":
    -          |
    -          |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
    -        """.stripMargin
    -      val dataWithTrailingLineSep = s"$data$lineSep"
    -
    -      Seq(data, dataWithTrailingLineSep).foreach { lines =>
    -        withTempPath { path =>
    -          Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
    -          val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
    -          val expectedSchema =
    -            StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
    -          checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
    -          assert(df.schema === expectedSchema)
    +  def testFile(fileName: String): String = {
    +    
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
    +  }
    +
    +  test("SPARK-23723: json in UTF-16 with BOM") {
    +    val fileName = "json-tests/utf16WithBOM.json"
    +    val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
    +    val jsonDF = spark.read.schema(schema)
    +      // This option will be replaced by .option("lineSep", "x00 0a")
    +      // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
    +      .option("mode", "DROPMALFORMED")
    +      .json(testFile(fileName))
    +
    +    checkAnswer(jsonDF, Seq(
    +      Row("Chris", "Baird"), Row("Doug", "Rood")
    +    ))
    +  }
    +
    +  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
    +    val fileName = "json-tests/utf32BEWithBOM.json"
    +    val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
    +    val jsonDF = spark.read.schema(schema)
    +      .option("multiline", "true")
    +      .json(testFile(fileName))
    +
    +    checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
    +  }
    +
    +  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
    +    val fileName = "json-tests/utf16LE.json"
    +    val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
    +    val jsonDF = spark.read.schema(schema)
    +      .option("multiline", "true")
    +      .options(Map("encoding" -> "UTF-16LE"))
    +      .json(testFile(fileName))
    +
    +    checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
    +  }
    +
    +  test("SPARK-23723: Unsupported charset name") {
    +    val invalidCharset = "UTF-128"
    +    val exception = intercept[java.io.UnsupportedEncodingException] {
    +      spark.read
    +        .options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
    +        .json(testFile("json-tests/utf16LE.json"))
    +        .count()
    +    }
    +
    +    assert(exception.getMessage.contains(invalidCharset))
    +  }
    +
    +  test("SPARK-23723: checking that the charset option is case agnostic") {
    +    val fileName = "json-tests/utf16LE.json"
    +    val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
    +    val jsonDF = spark.read.schema(schema)
    +      .option("multiline", "true")
    +      .options(Map("charset" -> "uTf-16lE"))
    +      .json(testFile(fileName))
    +
    +    checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
    +  }
    +
    +
    +  test("SPARK-23723: specified charset is not matched to actual charset") {
    +    val fileName = "json-tests/utf16LE.json"
    +    val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
    +    val exception = intercept[SparkException] {
    +      spark.read.schema(schema)
    +        .option("mode", "FAILFAST")
    +        .option("multiline", "true")
    +        .options(Map("charset" -> "UTF-16BE"))
    +        .json(testFile(fileName))
    +        .count()
    +    }
    +    val errMsg = exception.getMessage
    +
    +    assert(errMsg.contains("Malformed records are detected in record 
parsing"))
    +  }
    +
    +  def checkCharset(
    +    expectedCharset: String,
    +    pathToJsonFiles: String,
    +    expectedContent: String
    +  ): Unit = {
    +    val jsonFiles = new File(pathToJsonFiles)
    +      .listFiles()
    +      .filter(_.isFile)
    +      .filter(_.getName.endsWith("json"))
    +    val jsonContent = jsonFiles.map { file =>
    +      scala.io.Source.fromFile(file, expectedCharset).mkString
    +    }
    +    val cleanedContent = jsonContent
    +      .mkString
    +      .trim
    +      .replaceAll(" ", "")
    +
    +    assert(cleanedContent == expectedContent)
    +  }
    +
    +  test("SPARK-23723: save json in UTF-32BE") {
    +    val encoding = "UTF-32BE"
    +    withTempPath { path =>
    +      val df = spark.createDataset(Seq(("Dog", 42)))
    +      df.write
    +        .options(Map("charset" -> encoding, "lineSep" -> "\n"))
    +        .format("json").mode("overwrite")
    +        .save(path.getCanonicalPath)
    +
    +      checkCharset(
    +        expectedCharset = encoding,
    +        pathToJsonFiles = path.getCanonicalPath,
    +        expectedContent = """{"_1":"Dog","_2":42}"""
    +      )
    +    }
    +  }
    +
    +  test("SPARK-23723: save json in default charset - UTF-8") {
    +    withTempPath { path =>
    +      val df = spark.createDataset(Seq(("Dog", 42)))
    +      df.write
    +        .format("json").mode("overwrite")
    +        .save(path.getCanonicalPath)
    +
    +      checkCharset(
    +        expectedCharset = "UTF-8",
    +        pathToJsonFiles = path.getCanonicalPath,
    +        expectedContent = """{"_1":"Dog","_2":42}"""
    +      )
    +    }
    +  }
    +
    +  test("SPARK-23723: wrong output charset") {
    +    val charset = "UTF-128"
    +    val exception = intercept[java.io.UnsupportedEncodingException] {
    +      withTempPath { path =>
    +        val df = spark.createDataset(Seq((0)))
    +        df.write
    +          .options(Map("charset" -> charset, "lineSep" -> "\n"))
    +          .format("json").mode("overwrite")
    +          .save(path.getCanonicalPath)
    +      }
    +    }
    +
    +    assert(exception.getMessage == charset)
    +  }
    +
    +  test("SPARK-23723: read written json in UTF-16") {
    +    val charset = "UTF-16"
    +    case class Rec(f1: String, f2: Int)
    +    withTempPath { path =>
    +      val ds = spark.createDataset(Seq(
    +        ("a", 1), ("b", 2), ("c", 3))
    +      ).repartition(2)
    +      ds.write
    +        .options(Map("charset" -> charset, "lineSep" -> "\n"))
    +        .format("json").mode("overwrite")
    +        .save(path.getCanonicalPath)
    +      val savedDf = spark
    +        .read
    +        .schema(ds.schema)
    +        // This option will be replaced by .option("lineSep", "x00 0a")
    +        // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
    +        .option("mode", "DROPMALFORMED")
    +        .json(path.getCanonicalPath)
    +
    +      checkAnswer(savedDf.toDF(), ds.toDF())
    +    }
    +  }
    +
    +  def checkReadJson(
    +    lineSep: String,
    +    charsetOption: String,
    +    charset: String,
    +    inferSchema: Boolean,
    +    runId: Int
    +  ): Unit = {
    +    test(s"SPARK-23724: checks reading json in ${charset} #${runId}") {
    +      val delimInBytes = {
    +        if (lineSep.startsWith("x")) {
    +          lineSep.replaceAll("[^0-9A-Fa-f]", "")
    +            .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
    +        } else {
    +          lineSep.getBytes(charset)
             }
           }
    +      case class Rec(f1: String, f2: Int) {
    +        def json = s"""{"f1":"${f1}", "f2":$f2}"""
    +        def bytes = json.getBytes(charset)
    +        def row = Row(f1, f2)
    +      }
    +      val schema = new StructType().add("f1", StringType).add("f2", 
IntegerType)
    +      withTempPath { path =>
    +        val records = List(Rec("a", 1), Rec("b", 2))
    +        val data = records.map(_.bytes).reduce((a1, a2) => a1 ++ 
delimInBytes ++ a2)
    +        val os = new FileOutputStream(path)
    +        os.write(data)
    +        os.close()
    +        val reader = if (inferSchema) {
    +          spark.read
    +        } else {
    +          spark.read.schema(schema)
    +        }
    +        val savedDf = reader
    +          .option(charsetOption, charset)
    +          .option("lineSep", lineSep)
    +          .json(path.getCanonicalPath)
    +        checkAnswer(savedDf, records.map(_.row))
    +      }
    +    }
    +  }
     
    +  // scalastyle:off nonascii
    +  List(
    +    ("|", "encoding", "UTF-8", false),
    +    ("^", "charset", "UTF-16BE", true),
    --- End diff --
    
    Shall we have one test for `charset` and use `encoding` everywhere?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to