AngersZhuuuu commented on a change in pull request #29085:
URL: https://github.com/apache/spark/pull/29085#discussion_r459182038



##########
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
##########
@@ -54,87 +52,110 @@ case class HiveScriptTransformationExec(
     script: String,
     output: Seq[Attribute],
     child: SparkPlan,
-    ioschema: HiveScriptIOSchema)
-  extends BaseScriptTransformationExec {
+    ioschema: ScriptTransformationIOSchema)
+  extends BaseScriptTransformationExec with HiveInspectors {
 
-  override def processIterator(
-      inputIterator: Iterator[InternalRow],
-      hadoopConf: Configuration): Iterator[InternalRow] = {
-    val cmd = List("/bin/bash", "-c", script)
-    val builder = new ProcessBuilder(cmd.asJava)
-
-    val proc = builder.start()
-    val inputStream = proc.getInputStream
-    val outputStream = proc.getOutputStream
-    val errorStream = proc.getErrorStream
-
-    // In order to avoid deadlocks, we need to consume the error output of the 
child process.
-    // To avoid issues caused by large error output, we use a circular buffer 
to limit the amount
-    // of error output that we retain. See SPARK-7862 for more discussion of 
the deadlock / hang
-    // that motivates this.
-    val stderrBuffer = new CircularBuffer(2048)
-    new RedirectThread(
-      errorStream,
-      stderrBuffer,
-      "Thread-ScriptTransformation-STDERR-Consumer").start()
+  private def initInputSerDe(
+      input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] 
= {
+    ioschema.inputSerdeClass.map { serdeClass =>
+      val (columns, columnTypes) = parseAttrs(input)
+      val serde = initSerDe(serdeClass, columns, columnTypes, 
ioschema.inputSerdeProps)
+      val fieldObjectInspectors = columnTypes.map(toInspector)
+      val objectInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(columns.asJava, 
fieldObjectInspectors.asJava)
+      (serde, objectInspector)
+    }
+  }
 
-    val outputProjection = new InterpretedProjection(input, child.output)
+  private def initOutputSerDe(
+      output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] 
= {
+    ioschema.outputSerdeClass.map { serdeClass =>
+      val (columns, columnTypes) = parseAttrs(output)
+      val serde = initSerDe(serdeClass, columns, columnTypes, 
ioschema.outputSerdeProps)
+      val structObjectInspector = 
serde.getObjectInspector().asInstanceOf[StructObjectInspector]
+      (serde, structObjectInspector)
+    }
+  }
 
-    // This nullability is a performance optimization in order to avoid an 
Option.foreach() call
-    // inside of a loop
-    @Nullable val (inputSerde, inputSoi) = 
ioschema.initInputSerDe(input).getOrElse((null, null))
+  private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) 
= {
+    val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
+    val columnTypes = attrs.map(_.dataType)
+    (columns, columnTypes)
+  }
 
-    // This new thread will consume the ScriptTransformation's input rows and 
write them to the
-    // external process. That process's output will be read by this current 
thread.
-    val writerThread = new HiveScriptTransformationWriterThread(
-      inputIterator.map(outputProjection),
-      input.map(_.dataType),
-      inputSerde,
-      inputSoi,
-      ioschema,
-      outputStream,
-      proc,
-      stderrBuffer,
-      TaskContext.get(),
-      hadoopConf
-    )
+  private def initSerDe(

Review comment:
       > Sorry for the confusion, but, on second thought, its better to pull 
out hive-serde related functions from `HiveScriptTransformationExec` then 
create a companion object having them for readability 
[maropu@972775b](https://github.com/maropu/spark/commit/972775b821406d81d3c1ba1c718de3037a0ca068).
 WDTY?
   
   Agree, make ScripTransformExec only handle data process. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to