[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20727 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r175982059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `None`, + * it covers `\r`, `\r\n` and `\n`. --- End diff -- Sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r175977344 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `None`, + * it covers `\r`, `\r\n` and `\n`. --- End diff -- We should mention that this default rule is not defined by us, but by hadoop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r174999412 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- We already used the term "line" everywhere in the doc. We could just say lines are separated by a character and minimise the doc fix and etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r174998682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- My reason is to refer other places so that practically other users feel comfortable, which I usually put more importances. I really don't want to spend time on research why the other references used the term "line". If we think about the plain text, CSV or JSON, the term "line" can be correct in a way. We documented http://jsonlines.org/ (even this reference used the term "line"). I think, for example, the line can be defined by its separator. https://github.com/apache/spark/blob/c36fecc3b416c38002779c3cf40b6a665ac4bf13/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala#L1645 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r174828198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- The definition of records was introduced in the classical paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf . I don't see any reasons for replacing it by **line** right now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r174821385 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- I don't really care what we are going to call it this. I think it is important that we are consistent across datasources. IMO Since we can define anything to be a separator, not only newlines, `records` seems to fit a bit better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r173642056 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) --- End diff -- OK. Let me try to address this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r173639932 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- One example might sound counterintuitive to you but it looks less consistent with other places at least I usually refer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r173639748 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +31,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `None`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: Option[String], +conf: Configuration) extends Iterator[Text] with Closeable { --- End diff -- Yup, I am sorry if I wasn't clear. I mean [the doc describes](https://hadoop.apache.org/docs/r2.7.1/api/index.html?org/apache/hadoop/io/Text.html): > This class stores text using standard UTF8 encoding. I was wondering if that's a official way to use `Text` because that sounds rather an informal workaround. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r173633651 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +31,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `None`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: Option[String], +conf: Configuration) extends Iterator[Text] with Closeable { --- End diff -- Some methods of Hadoop's Text have such assumption about UTF-8 encoding. In general a datasource could eliminate the restriction by using the Text class as container of raw bytes and calling methods like **getBytes()** and **getLength()**. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r173633462 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) --- End diff -- Why do you think this class is responsible for converting string separator to array of bytes? Especially restriction by one charset is not clear. The purpose of the class is to provide the Iterator interface of records/lines to datasources. And this class doesn't have to know about datasource's charset. I would not stick on particular charset here and expose the separator parameter with `Option[Array[Byte]]` like the LineReader provides a constructor with `byte[] recordDelimiter`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r173632775 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- In the example above, the line is counterintuitive for me. I imagine a line in text files as a sequence of one or more characters, displayed within a single horizontal sequence. I would prefer the short name *recSep* or *recordSeparator* for long name. I guess when the option will be used, it will separate text not by new line chars like `'\n'`, `'\r\n'`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172721381 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +31,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `None`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: Option[String], +conf: Configuration) extends Iterator[Text] with Closeable { --- End diff -- Note that it's an internal API for datasources and Hadoop's Text already has an assumption for utf8. I don't think we should call getBytes with utf8 at each caller side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172694770 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- Actually, I am not sure why we so care about this naming. I think all recordSeparator, recordSep, lineSeperator and lineSep are all fine actually. lineSep was choosen to be consistent at my best. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172694097 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- To be clear, the name "lineSep" is taken after Python, R's `sep` and our supporting option `sep`. Here was another discussion - https://github.com/apache/spark/pull/18581#discussion_r134813986 "line" is taken after Univocity CSV parser and seems making sense in Hadoop too. I think we documented JSON is in lines, and I think it makes sense to CSV and text. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172682591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) --- End diff -- I mean, it's initially an unicode string via datasource interface and we need to somehow convert it to bytes once as it takes bytes. Do you mean adding another option for specifying charset or did I maybe miss something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172657412 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- The name is not so matter as having the same name for the option across all supported datasources - text, csv and json to don't confuse users. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172656702 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) --- End diff -- My suggestion is to pass Array[Byte] into the class. If charsets different from UTF-8 will be supported in the future, this place should be changed for sure. You can make this class more tolerant to input charsets right now. Just for an example, json reader (jackson json parser) is able to read json in any standard charsets. To fix its per-line mode, need to support lineSep in any charset and convert lineSep to array of byte before using the class. If you restrict charset of lineSep to UTF-8, you just make the wall for other datasources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172362385 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala --- @@ -172,6 +174,43 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + def testLineSeparator(lineSep: String): Unit = { +test(s"SPARK-23577: Support line separator - lineSep: '$lineSep'") { + // Read + val values = Seq("a", "b", "\nc") + val data = values.mkString(lineSep) + val dataWithTrailingLineSep = s"$data$lineSep" + Seq(data, dataWithTrailingLineSep).foreach { lines => +withTempPath { path => + Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) + val df = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath) + checkAnswer(df, Seq("a", "b", "\nc").toDF()) +} + } + + // Write + withTempPath { path => +values.toDF().coalesce(1) + .write.option("lineSep", lineSep).text(path.getAbsolutePath) +val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head +val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) +assert(readBack === s"a${lineSep}b${lineSep}\nc${lineSep}") + } + + // Roundtrip + withTempPath { path => +val df = values.toDF() +df.write.option("lineSep", lineSep).text(path.getAbsolutePath) +val readBack = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath) +checkAnswer(df, readBack) + } +} + } + + Seq("|", "^", "::", "!!!@3").foreach { lineSep => --- End diff -- Sure, sounds a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172362185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `\n`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: String, +conf: Configuration) extends Iterator[Text] with Closeable { --- End diff -- That's preserved by `def this(file: PartitionedFile, conf: Configuration) = this(file, "\n", conf)` I believe. `execution` package is meant to be an internal package anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172361378 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { --- End diff -- ditto for option thing. Will bring it back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172361186 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) --- End diff -- Would you have a suggestion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172361020 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `\n`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: String, --- End diff -- I would suggest to use `String` here and control the encoding stuff in one place here. For option thing, see https://github.com/apache/spark/pull/20727#discussion_r172360489. Will bring it back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172360489 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) +} else { + // This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`. --- End diff -- I initially did this but reverted it back - https://github.com/apache/spark/pull/18581#discussion_r134814393. Let me bring the option back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172359828 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- It was taken after Univocity parser, `setLineSeparator`. I think Hadoop calls it `textinputformat.record.delimiter` but the class name is `LineRecordReader` or `LineReader`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172358851 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") --- End diff -- Nope, it's `nonEmpty` on `String`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172347520 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172347231 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) +} else { + // This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`. --- End diff -- +1. It seems cleaner to use None as default, which indicates `\r`, `\r\n` and `\n`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172330061 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) --- End diff -- It would be better to not depend on particular charset --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172341859 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") } private[text] object TextOptions { val COMPRESSION = "compression" val WHOLETEXT = "wholetext" + val LINE_SEPARATOR = "lineSep" --- End diff -- Why is it not "lineSeparator"? I would propose another name for the option: recordSeparator. Could you image you have the text file: ``` id: 123 cmd: ls -l --- id: 456 cmd: rm -rf ``` where the separator is `---`. If the separator is not new line delimiter, records don't looks like lines. And recordSeparator would be closer to Hadoop's terminology. Besides of that, probably, we introduce similar option for another datasources like json. recordSeparator of json records (not lines) sounds better from my point of view. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172328966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `\n`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: String, --- End diff -- A constructor of LineRecordReader has the same param but its type is array of bytes. I would propose to expose the same type here - Array[Byte]. Also you handle special value - '\n'. If you define the param as lineSeparator: Option[Array[Byte]], None would indicate default behavior - '\n', '\r' or '\r\n' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172337845 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala --- @@ -172,6 +174,43 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + def testLineSeparator(lineSep: String): Unit = { +test(s"SPARK-23577: Support line separator - lineSep: '$lineSep'") { + // Read + val values = Seq("a", "b", "\nc") + val data = values.mkString(lineSep) + val dataWithTrailingLineSep = s"$data$lineSep" + Seq(data, dataWithTrailingLineSep).foreach { lines => +withTempPath { path => + Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) + val df = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath) + checkAnswer(df, Seq("a", "b", "\nc").toDF()) +} + } + + // Write + withTempPath { path => +values.toDF().coalesce(1) + .write.option("lineSep", lineSep).text(path.getAbsolutePath) +val partFile = Utils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head +val readBack = new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8) +assert(readBack === s"a${lineSep}b${lineSep}\nc${lineSep}") + } + + // Roundtrip + withTempPath { path => +val df = values.toDF() +df.write.option("lineSep", lineSep).text(path.getAbsolutePath) +val readBack = spark.read.option("lineSep", lineSep).text(path.getAbsolutePath) +checkAnswer(df, readBack) + } +} + } + + Seq("|", "^", "::", "!!!@3").foreach { lineSep => --- End diff -- Please, check invisible and control chars --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172336258 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -30,9 +30,19 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl /** * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines * in that file. + * + * @param file A part (i.e. "block") of a single file that should be read line by line. + * @param lineSeparator A line separator that should be used for each line. If the value is `\n`, + * it covers `\r`, `\r\n` and `\n`. + * @param conf Hadoop configuration */ class HadoopFileLinesReader( -file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { +file: PartitionedFile, +lineSeparator: String, +conf: Configuration) extends Iterator[Text] with Closeable { --- End diff -- I cannot predict where the class could be used but I believe we should keep backward compatibility by sources. I mean it would be better to add new parameter after `conf: Configuration` with default value which preserve old behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172330645 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { + new LineRecordReader(lineSeparator.getBytes("UTF-8")) +} else { + // This behavior follows Hive. `\n` covers `\r`, `\r\n` and `\n`. --- End diff -- The case for lineSeparator = '\n' covers '\r' and '\r\n', it looks not so good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172329789 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala --- @@ -42,7 +52,12 @@ class HadoopFileLinesReader( Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) -val reader = new LineRecordReader() +val reader = if (lineSeparator != "\n") { --- End diff -- if it has type Option[Array[Byte]]: ``` val reader = lineSeparator match { case Some(sep) => new LineRecordReader(sep) case _ => new LineRecordReader() } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172282815 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean + val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n") + require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") --- End diff -- This `require` looks redundant after a `getOrElse` call --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20727#discussion_r172015316 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -400,6 +400,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * * `maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger. + * `wholetext` (default `false`): If true, read a file as a single row and not split by "\n". --- End diff -- I also added some changes missed for `wholetext` too while I am here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/20727 [SPARK-23577][SQL] Supports custom line separator for text datasource ## What changes were proposed in this pull request? This PR proposes to add `lineSep` option for a configurable line separator in text datasource. Note that this PR follows Hive's default behaviour for` \n` for now - cover other newline variants. See this discussion - https://github.com/apache/spark/pull/18581#discussion_r134814393. ## How was this patch tested? Manual tests and unit tests were added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark linesep-text Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20727.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20727 commit 93652b349ca8107035465096370464053b558ef0 Author: hyukjinkwonDate: 2018-03-03T11:58:03Z Supports custom line separator for text datasource --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org