Hi,

Thank you for your response.
I finally found the cause of this


When multiLine option is set, input file is read by
UnivocityParser.parseStream() method.
This method, in turn, calls convertStream() that initializes tokenizer with
tokenizer.beginParsing(inputStream) and parses records using
tokenizer.parseNext().

The problem is that beginParsing() method uses UTF-8 as its default
char-encoding.
As a result, user provided "encoding" option will be ignored.


When multiLine option is NOT set, on the other hand, input file is first
read and decoded from TextInputCSVDataSource.readFile() method.
Then, it is sent to UnivocityParser.parseIterator() method.
Therefore, no problem is occurred in in this case.


To solve this problem, I removed the call for tokenizer.beginParsing()
method in convertStream() since we cannot access options.charset variable
here.
Then, added it to two places: tokenizeStream() and parseStream() methods.
Especially, in parseStream() method, I added charset as the second
parameter for beginParsing() method.

I attached git diff content as an attachment file.
I appreciate any comments on this.


Best wishes,
Han-Cheol




On Wed, Aug 16, 2017 at 3:09 PM, Takeshi Yamamuro <linguin....@gmail.com>
wrote:

> Hi,
>
> Since the csv source currently supports ascii-compatible charset, so I
> guess shift-jis also works well.
> You could check Hyukjin's comment in https://issues.apache.org/
> jira/browse/SPARK-21289 for more info.
>
>
> On Wed, Aug 16, 2017 at 2:54 PM, Han-Cheol Cho <prian...@gmail.com> wrote:
>
>> My apologies,
>>
>> It was a problem of our Hadoop cluster.
>> When we tested the same code on another cluster (HDP-based), it worked
>> without any problem.
>>
>> ```scala
>> ## make sjis text
>> cat a.txt
>> 8月データだけでやってみよう
>> nkf -W -s a.txt >b.txt
>> cat b.txt
>> 87n%G!<%?$@$1$G$d$C$F$_$h$&
>> nkf -s -w b.txt
>> 8月データだけでやってみよう
>> hdfs dfs -put a.txt b.txt
>>
>> ## YARN mode test
>> spark.read.option("encoding", "utf-8").csv("a.txt").show(1)
>> +--------------+
>> |           _c0|
>> +--------------+
>> |8月データだけでやってみよう|
>> +--------------+
>>
>> spark.read.option("encoding", "sjis").csv("b.txt").show(1)
>> +--------------+
>> |           _c0|
>> +--------------+
>> |8月データだけでやってみよう|
>> +--------------+
>>
>> spark.read.option("encoding", "utf-8").option("multiLine",
>> true).csv("a.txt").show(1)
>> +--------------+
>> |           _c0|
>> +--------------+
>> |8月データだけでやってみよう|
>> +--------------+
>>
>> spark.read.option("encoding", "sjis").option("multiLine",
>> true).csv("b.txt").show(1)
>> +--------------+
>> |           _c0|
>> +--------------+
>> |8月データだけでやってみよう|
>> +--------------+
>> ```
>>
>> I am still digging the root cause and will share it later :-)
>>
>> Best wishes,
>> Han-Choel
>>
>>
>> On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho <prian...@gmail.com>
>> wrote:
>>
>>> Dear Spark ML members,
>>>
>>>
>>> I experienced a trouble in using "multiLine" option to load CSV data
>>> with Shift-JIS encoding.
>>> When option("multiLine", true) is specified, option("encoding",
>>> "encoding-name") just doesn't work anymore.
>>>
>>>
>>> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
>>> method doesn't use parser.options.charset at all.
>>>
>>> object MultiLineCSVDataSource extends CSVDataSource {
>>>   override val isSplitable: Boolean = false
>>>
>>>   override def readFile(
>>>       conf: Configuration,
>>>       file: PartitionedFile,
>>>       parser: UnivocityParser,
>>>       schema: StructType): Iterator[InternalRow] = {
>>>     UnivocityParser.parseStream(
>>>       CodecStreams.createInputStreamWithCloseResource(conf,
>>> file.filePath),
>>>       parser.options.headerFlag,
>>>       parser,
>>>       schema)
>>>   }
>>>   ...
>>>
>>> On the other hand, TextInputCSVDataSource.readFile() method uses it:
>>>
>>>   override def readFile(
>>>       conf: Configuration,
>>>       file: PartitionedFile,
>>>       parser: UnivocityParser,
>>>       schema: StructType): Iterator[InternalRow] = {
>>>     val lines = {
>>>       val linesReader = new HadoopFileLinesReader(file, conf)
>>>       Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_
>>> => linesReader.close()))
>>>       linesReader.map { line =>
>>>         new String(line.getBytes, 0, line.getLength,
>>> parser.options.charset)    // <---- charset option is used here.
>>>       }
>>>     }
>>>
>>>     val shouldDropHeader = parser.options.headerFlag && file.start == 0
>>>     UnivocityParser.parseIterator(lines, shouldDropHeader, parser,
>>> schema)
>>>   }
>>>
>>>
>>> It seems like a bug.
>>> Is there anyone who had the same problem before?
>>>
>>>
>>> Best wishes,
>>> Han-Cheol
>>>
>>> --
>>> ==================================
>>> Han-Cheol Cho, Ph.D.
>>> Data scientist, Data Science Team, Data Laboratory
>>> NHN Techorus Corp.
>>>
>>> Homepage: https://sites.google.com/site/priancho/
>>> ==================================
>>>
>>
>>
>>
>> --
>> ==================================
>> Han-Cheol Cho, Ph.D.
>> Data scientist, Data Science Team, Data Laboratory
>> NHN Techorus Corp.
>>
>> Homepage: https://sites.google.com/site/priancho/
>> ==================================
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
==================================
Han-Cheol Cho, Ph.D.
Data scientist, Data Science Team, Data Laboratory
NHN Techorus Corp.

Homepage: https://sites.google.com/site/priancho/
==================================
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index c3657acb7d..126763b0f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -21,6 +21,7 @@ import java.io.InputStream
 import java.math.BigDecimal
 import java.text.NumberFormat
 import java.util.Locale
+import java.nio.charset.Charset

 import scala.util.Try
 import scala.util.control.NonFatal
@@ -237,6 +238,7 @@ private[csv] object UnivocityParser {
       inputStream: InputStream,
       shouldDropHeader: Boolean,
       tokenizer: CsvParser): Iterator[Array[String]] = {
+    tokenizer.beginParsing(inputStream)
     convertStream(inputStream, shouldDropHeader, tokenizer)(tokens => tokens)
   }

@@ -248,12 +250,17 @@ private[csv] object UnivocityParser {
       shouldDropHeader: Boolean,
       parser: UnivocityParser,
       schema: StructType): Iterator[InternalRow] = {
+    
     val tokenizer = parser.tokenizer
+    val inputCharset = Charset.forName(parser.options.charset)
+    tokenizer.beginParsing(inputStream, inputCharset)
+    
     val safeParser = new FailureSafeParser[Array[String]](
       input => Seq(parser.convert(input)),
       parser.options.parseMode,
       schema,
       parser.options.columnNameOfCorruptRecord)
+    
     convertStream(inputStream, shouldDropHeader, tokenizer) { tokens =>
       safeParser.parse(tokens)
     }.flatten
@@ -263,7 +270,7 @@ private[csv] object UnivocityParser {
       inputStream: InputStream,
       shouldDropHeader: Boolean,
       tokenizer: CsvParser)(convert: Array[String] => T) = new Iterator[T] {
-    tokenizer.beginParsing(inputStream)
+    
     private var nextRecord = {
       if (shouldDropHeader) {
         tokenizer.parseNext()
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to