[ 
https://issues.apache.org/jira/browse/SPARK-32607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

angerszhu updated SPARK-32607:
------------------------------
    Description: 
reader.readLine() should respect ``
{code:java}
protected def createOutputIteratorWithoutSerde(
    writerThread: BaseScriptTransformationWriterThread,
    inputStream: InputStream,
    proc: Process,
    stderrBuffer: CircularBuffer): Iterator[InternalRow] = {
  new Iterator[InternalRow] {
    var curLine: String = null
    val reader = new BufferedReader(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8))

    val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")
    val processRowWithoutSerde = if (!ioschema.schemaLess) {
      prevLine: String =>
        new GenericInternalRow(
          prevLine.split(outputRowFormat)
            .zip(outputFieldWriters)
            .map { case (data, writer) => writer(data) })
    } else {
      // In schema less mode, hive default serde will choose first two output 
column as output
      // if output column size less then 2, it will throw 
ArrayIndexOutOfBoundsException.
      // Here we change spark's behavior same as hive's default serde.
      // But in hive, TRANSFORM with schema less behavior like origin spark, we 
will fix this
      // to keep spark and hive behavior same in SPARK-32388
      val kvWriter = 
CatalystTypeConverters.createToCatalystConverter(StringType)
      prevLine: String =>
        new GenericInternalRow(
          prevLine.split(outputRowFormat).slice(0, 2)
            .map(kvWriter))
    }

    override def hasNext: Boolean = {
      try {
        if (curLine == null) {
          curLine = reader.readLine()
          if (curLine == null) {
            checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
            return false
          }
        }
        true
      } catch {
        case NonFatal(e) =>
          // If this exception is due to abrupt / unclean termination of `proc`,
          // then detect it and propagate a better exception message for end 
users
          checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)

          throw e
      }
    }

    override def next(): InternalRow = {
      if (!hasNext) {
        throw new NoSuchElementException
      }
      val prevLine = curLine
      curLine = reader.readLine()
      processRowWithoutSerde(prevLine)
    }
  }
}
{code}

> Script Transformation no-serde read line should respect 
> `TOK_TABLEROWFORMATLINES`
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-32607
>                 URL: https://issues.apache.org/jira/browse/SPARK-32607
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: angerszhu
>            Priority: Major
>
> reader.readLine() should respect ``
> {code:java}
> protected def createOutputIteratorWithoutSerde(
>     writerThread: BaseScriptTransformationWriterThread,
>     inputStream: InputStream,
>     proc: Process,
>     stderrBuffer: CircularBuffer): Iterator[InternalRow] = {
>   new Iterator[InternalRow] {
>     var curLine: String = null
>     val reader = new BufferedReader(new InputStreamReader(inputStream, 
> StandardCharsets.UTF_8))
>     val outputRowFormat = 
> ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")
>     val processRowWithoutSerde = if (!ioschema.schemaLess) {
>       prevLine: String =>
>         new GenericInternalRow(
>           prevLine.split(outputRowFormat)
>             .zip(outputFieldWriters)
>             .map { case (data, writer) => writer(data) })
>     } else {
>       // In schema less mode, hive default serde will choose first two output 
> column as output
>       // if output column size less then 2, it will throw 
> ArrayIndexOutOfBoundsException.
>       // Here we change spark's behavior same as hive's default serde.
>       // But in hive, TRANSFORM with schema less behavior like origin spark, 
> we will fix this
>       // to keep spark and hive behavior same in SPARK-32388
>       val kvWriter = 
> CatalystTypeConverters.createToCatalystConverter(StringType)
>       prevLine: String =>
>         new GenericInternalRow(
>           prevLine.split(outputRowFormat).slice(0, 2)
>             .map(kvWriter))
>     }
>     override def hasNext: Boolean = {
>       try {
>         if (curLine == null) {
>           curLine = reader.readLine()
>           if (curLine == null) {
>             checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
>             return false
>           }
>         }
>         true
>       } catch {
>         case NonFatal(e) =>
>           // If this exception is due to abrupt / unclean termination of 
> `proc`,
>           // then detect it and propagate a better exception message for end 
> users
>           checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)
>           throw e
>       }
>     }
>     override def next(): InternalRow = {
>       if (!hasNext) {
>         throw new NoSuchElementException
>       }
>       val prevLine = curLine
>       curLine = reader.readLine()
>       processRowWithoutSerde(prevLine)
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to