[ https://issues.apache.org/jira/browse/SPARK-32607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
angerszhu updated SPARK-32607: ------------------------------ Description: reader.readLine() should respect `` {code:java} protected def createOutputIteratorWithoutSerde( writerThread: BaseScriptTransformationWriterThread, inputStream: InputStream, proc: Process, stderrBuffer: CircularBuffer): Iterator[InternalRow] = { new Iterator[InternalRow] { var curLine: String = null val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") val processRowWithoutSerde = if (!ioschema.schemaLess) { prevLine: String => new GenericInternalRow( prevLine.split(outputRowFormat) .zip(outputFieldWriters) .map { case (data, writer) => writer(data) }) } else { // In schema less mode, hive default serde will choose first two output column as output // if output column size less then 2, it will throw ArrayIndexOutOfBoundsException. // Here we change spark's behavior same as hive's default serde. // But in hive, TRANSFORM with schema less behavior like origin spark, we will fix this // to keep spark and hive behavior same in SPARK-32388 val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType) prevLine: String => new GenericInternalRow( prevLine.split(outputRowFormat).slice(0, 2) .map(kvWriter)) } override def hasNext: Boolean = { try { if (curLine == null) { curLine = reader.readLine() if (curLine == null) { checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) return false } } true } catch { case NonFatal(e) => // If this exception is due to abrupt / unclean termination of `proc`, // then detect it and propagate a better exception message for end users checkFailureAndPropagate(writerThread, e, proc, stderrBuffer) throw e } } override def next(): InternalRow = { if (!hasNext) { throw new NoSuchElementException } val prevLine = curLine curLine = reader.readLine() processRowWithoutSerde(prevLine) } } } {code} > Script Transformation no-serde read line should respect > `TOK_TABLEROWFORMATLINES` > --------------------------------------------------------------------------------- > > Key: SPARK-32607 > URL: https://issues.apache.org/jira/browse/SPARK-32607 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 3.0.0 > Reporter: angerszhu > Priority: Major > > reader.readLine() should respect `` > {code:java} > protected def createOutputIteratorWithoutSerde( > writerThread: BaseScriptTransformationWriterThread, > inputStream: InputStream, > proc: Process, > stderrBuffer: CircularBuffer): Iterator[InternalRow] = { > new Iterator[InternalRow] { > var curLine: String = null > val reader = new BufferedReader(new InputStreamReader(inputStream, > StandardCharsets.UTF_8)) > val outputRowFormat = > ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") > val processRowWithoutSerde = if (!ioschema.schemaLess) { > prevLine: String => > new GenericInternalRow( > prevLine.split(outputRowFormat) > .zip(outputFieldWriters) > .map { case (data, writer) => writer(data) }) > } else { > // In schema less mode, hive default serde will choose first two output > column as output > // if output column size less then 2, it will throw > ArrayIndexOutOfBoundsException. > // Here we change spark's behavior same as hive's default serde. > // But in hive, TRANSFORM with schema less behavior like origin spark, > we will fix this > // to keep spark and hive behavior same in SPARK-32388 > val kvWriter = > CatalystTypeConverters.createToCatalystConverter(StringType) > prevLine: String => > new GenericInternalRow( > prevLine.split(outputRowFormat).slice(0, 2) > .map(kvWriter)) > } > override def hasNext: Boolean = { > try { > if (curLine == null) { > curLine = reader.readLine() > if (curLine == null) { > checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) > return false > } > } > true > } catch { > case NonFatal(e) => > // If this exception is due to abrupt / unclean termination of > `proc`, > // then detect it and propagate a better exception message for end > users > checkFailureAndPropagate(writerThread, e, proc, stderrBuffer) > throw e > } > } > override def next(): InternalRow = { > if (!hasNext) { > throw new NoSuchElementException > } > val prevLine = curLine > curLine = reader.readLine() > processRowWithoutSerde(prevLine) > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org