Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5ea58898c -> d76e066d3


[SPARK-14400][SQL] ScriptTransformation does not fail the job for bad user 
command

## What changes were proposed in this pull request?

- Refer to the Jira for the problem: jira : 
https://issues.apache.org/jira/browse/SPARK-14400
- The fix is to check if the process has exited with a non-zero exit code in 
`hasNext()`. I have moved this and checking of writer thread exception to a 
separate method.

## How was this patch tested?

- Ran a job which had incorrect transform script command and saw that the job 
fails
- Existing unit tests for `ScriptTransformationSuite`. Added a new unit test

Author: Tejas Patil <tej...@fb.com>

Closes #12194 from tejasapatil/script_transform.

(cherry picked from commit a96e4151a9d429cfaf457c07b4ce174890a3b39b)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d76e066d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d76e066d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d76e066d

Branch: refs/heads/branch-2.0
Commit: d76e066d3355a9942af3ae4f23d81a948c236e5e
Parents: 5ea5889
Author: Tejas Patil <tej...@fb.com>
Authored: Fri May 27 12:05:11 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri May 27 12:05:17 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkPlanTest.scala     |  7 +-
 .../hive/execution/ScriptTransformation.scala   | 90 +++++++++++++-------
 .../execution/ScriptTransformationSuite.scala   | 18 +++-
 3 files changed, 81 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d76e066d/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index 9fe0e96..b29e822 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -231,7 +231,12 @@ object SparkPlanTest {
     }
   }
 
-  private def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] 
= {
+  /**
+   * Runs the plan
+   * @param outputPlan SparkPlan to be executed
+   * @param spark SqlContext used for execution of the plan
+   */
+  def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
     val execution = new QueryExecution(spark.sparkSession, null) {
       override lazy val sparkPlan: SparkPlan = outputPlan transform {
         case plan: SparkPlan =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d76e066d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index f6e6a75..9e25e1d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe
 import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.io.Writable
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -127,45 +127,71 @@ case class ScriptTransformation(
         }
         val mutableRow = new SpecificMutableRow(output.map(_.dataType))
 
+        private def checkFailureAndPropagate(cause: Throwable = null): Unit = {
+          if (writerThread.exception.isDefined) {
+            throw writerThread.exception.get
+          }
+
+          // Checks if the proc is still alive (incase the command ran was bad)
+          // The ideal way to do this is to use Java 8's Process#isAlive()
+          // but it cannot be used because Spark still supports Java 7.
+          // Following is a workaround used to check if a process is alive in 
Java 7
+          // TODO: Once builds are switched to Java 8, this can be changed
+          try {
+            val exitCode = proc.exitValue()
+            if (exitCode != 0) {
+              logError(stderrBuffer.toString) // log the stderr circular buffer
+              throw new SparkException(s"Subprocess exited with status 
$exitCode. " +
+                s"Error: ${stderrBuffer.toString}", cause)
+            }
+          } catch {
+            case _: IllegalThreadStateException =>
+            // This means that the process is still alive. Move ahead
+          }
+        }
+
         override def hasNext: Boolean = {
-          if (outputSerde == null) {
-            if (curLine == null) {
-              curLine = reader.readLine()
+          try {
+            if (outputSerde == null) {
               if (curLine == null) {
-                if (writerThread.exception.isDefined) {
-                  throw writerThread.exception.get
+                curLine = reader.readLine()
+                if (curLine == null) {
+                  checkFailureAndPropagate()
+                  return false
                 }
-                false
-              } else {
-                true
               }
-            } else {
-              true
-            }
-          } else if (scriptOutputWritable == null) {
-            scriptOutputWritable = reusedWritableObject
+            } else if (scriptOutputWritable == null) {
+              scriptOutputWritable = reusedWritableObject
 
-            if (scriptOutputReader != null) {
-              if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
-                writerThread.exception.foreach(throw _)
-                false
+              if (scriptOutputReader != null) {
+                if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
+                  checkFailureAndPropagate()
+                  return false
+                }
               } else {
-                true
-              }
-            } else {
-              try {
-                scriptOutputWritable.readFields(scriptOutputStream)
-                true
-              } catch {
-                case _: EOFException =>
-                  if (writerThread.exception.isDefined) {
-                    throw writerThread.exception.get
-                  }
-                  false
+                try {
+                  scriptOutputWritable.readFields(scriptOutputStream)
+                } catch {
+                  case _: EOFException =>
+                    // This means that the stdout of `proc` (ie. TRANSFORM 
process) has exhausted.
+                    // Ideally the proc should *not* be alive at this point but
+                    // there can be a lag between EOF being written out and 
the process
+                    // being terminated. So explicitly waiting for the process 
to be done.
+                    proc.waitFor()
+                    checkFailureAndPropagate()
+                    return false
+                }
               }
             }
-          } else {
+
             true
+          } catch {
+            case NonFatal(e) =>
+              // If this exception is due to abrupt / unclean termination of 
`proc`,
+              // then detect it and propagate a better exception message for 
end users
+              checkFailureAndPropagate(e)
+
+              throw e
           }
         }
 
@@ -284,7 +310,6 @@ private class ScriptTransformationWriterThread(
           }
         }
       }
-      outputStream.close()
       threwException = false
     } catch {
       case NonFatal(e) =>
@@ -295,6 +320,7 @@ private class ScriptTransformationWriterThread(
         throw e
     } finally {
       try {
+        outputStream.close()
         if (proc.waitFor() != 0) {
           logError(stderrBuffer.toString) // log the stderr circular buffer
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/d76e066d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 6f80622..a8e81d7 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.scalatest.exceptions.TestFailedException
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
@@ -109,6 +109,22 @@ class ScriptTransformationSuite extends SparkPlanTest with 
TestHiveSingleton {
     }
     assert(e.getMessage().contains("intentional exception"))
   }
+
+  test("SPARK-14400 script transformation should fail for bad script command") 
{
+    val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+
+    val e = intercept[SparkException] {
+      val plan =
+        new ScriptTransformation(
+          input = Seq(rowsDf.col("a").expr),
+          script = "some_non_existent_command",
+          output = Seq(AttributeReference("a", StringType)()),
+          child = rowsDf.queryExecution.sparkPlan,
+          ioschema = serdeIOSchema)
+      SparkPlanTest.executePlan(plan, hiveContext)
+    }
+    assert(e.getMessage.contains("Subprocess exited with status"))
+  }
 }
 
 private case class ExceptionInjectingOperator(child: SparkPlan) extends 
UnaryExecNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to