+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* See the License for the specific language governing permissions and
+* limitations under the License.
+package org.apache.spark.sql.execution
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import net.razorvine.pickle.{Pickler, Unpickler}
+import org.apache.spark.{Accumulator, Logging => SparkLogging}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+ * A serialized version of a Python lambda function.  Suitable for use in a 
+ */
+private[spark] case class PythonUDF(
+    name: String,
+    command: Array[Byte],
+    envVars: JMap[String, String],
+    pythonIncludes: JList[String],
+    pythonExec: String,
+    pythonVer: String,
+    broadcastVars: JList[Broadcast[PythonBroadcast]],
+    accumulator: Accumulator[JList[Array[Byte]]],
+    dataType: DataType,
+    children: Seq[Expression]) extends Expression with SparkLogging {
+  override def toString: String = s"PythonUDF#$name(${children.mkString(",")})"
+  override def nullable: Boolean = true
+  override def eval(input: InternalRow): Any = {
+    throw new UnsupportedOperationException("PythonUDFs can not be directly 
+  }
+ * Extracts PythonUDFs from operators, rewriting the query plan so that the 
UDF can be evaluated
+ * alone in a batch.
+ *
+ * This has the limitation that the input to the Python UDF is not allowed 
include attributes from
+ * multiple child operators.
+ */
+private[spark] object ExtractPythonUDFs extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // Skip EvaluatePython nodes.
+    case plan: EvaluatePython => plan
+    case plan: LogicalPlan if plan.resolved =>
+      // Extract any PythonUDFs from the current operator.
+      val udfs = plan.expressions.flatMap(_.collect { case udf: PythonUDF => 
udf })
+      if (udfs.isEmpty) {
+        // If there aren't any, we are done.
+        plan
+      } else {
+        // Pick the UDF we are going to evaluate (TODO: Support evaluating 
multiple UDFs at a time)
+        // If there is more than one, we will add another evaluation operator 
in a subsequent pass.
+        udfs.find(_.resolved) match {
+          case Some(udf) =>
+            var evaluation: EvaluatePython = null
+            // Rewrite the child that has the input required for the UDF
+            val newChildren = { child =>
+              // Check to make sure that the UDF can be evaluated with only 
the input of this child.
+              // Other cases are disallowed as they are ambiguous or would 
require a cartesian
+              // product.
+              if (udf.references.subsetOf(child.outputSet)) {
+                evaluation = EvaluatePython(udf, child)
+                evaluation
+              } else if (udf.references.intersect(child.outputSet).nonEmpty) {
+                sys.error(s"Invalid PythonUDF $udf, requires attributes from 
more than one child.")
+              } else {
+                child
+              }
+            }
+            assert(evaluation != null, "Unable to evaluate PythonUDF.  Missing 
input attributes.")
+            // Trim away the new UDF value if it was only used for filtering 
or something.
+            logical.Project(
+              plan.output,
+              plan.transformExpressions {
+                case p: PythonUDF if p.fastEquals(udf) => 
+              }.withNewChildren(newChildren))
+          case None =>
+            // If there is no Python UDF that is resolved, skip this round.
+            plan
+        }
+      }
+  }
+object EvaluatePython {
+  def apply(udf: PythonUDF, child: LogicalPlan): EvaluatePython =
+    new EvaluatePython(udf, child, AttributeReference("pythonUDF", 
+  /**
+   * Helper for converting a Scala object to a java suitable for pyspark 
+   */
+  def toJava(obj: Any, dataType: DataType): Any = (obj, dataType) match {
+    case (null, _) => null
+    case (row: Row, struct: StructType) =>
+      val fields = => field.dataType)
+ {
+        case (obj, dataType) => toJava(obj, dataType)
+      }.toArray
+    case (seq: Seq[Any], array: ArrayType) =>
+ => toJava(x, array.elementType)).asJava
+    case (list: JList[_], array: ArrayType) =>
+ => toJava(x, array.elementType)).asJava
+    case (arr, array: ArrayType) if arr.getClass.isArray =>
+      arr.asInstanceOf[Array[Any]].map(x => toJava(x, array.elementType))
+    case (obj: Map[_, _], mt: MapType) => {
+      case (k, v) => (toJava(k, mt.keyType), toJava(v, mt.valueType))
+    }.asJava
+    case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), 
+    case (date: Int, DateType) => DateTimeUtils.toJavaDate(date)
+    case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t)
+    case (s: UTF8String, StringType) => s.toString
+    // Pyrolite can handle Timestamp and Decimal
+    case (other, _) => other
+  }
+  /**
+   * Convert Row into Java Array (for pickled into Python)
+   */
+  def rowToArray(row: Row, fields: Seq[DataType]): Array[Any] = {
+    // TODO: this is slow!
+ {case (obj, dt) => toJava(obj, dt)}.toArray
+  }
+  // Converts value to the type specified by the data type.
+  // Because Python does not have data types for TimestampType, FloatType, 
ShortType, and
+  // ByteType, we need to explicitly convert values in columns of these data 
types to the desired
+  // JVM data types.
+  def fromJava(obj: Any, dataType: DataType): Any = (obj, dataType) match {
+    // TODO: We should check nullable
+    case (null, _) => null
+    case (c: java.util.List[_], ArrayType(elementType, _)) =>
+ { e => fromJava(e, elementType)}: Seq[Any]
+    case (c, ArrayType(elementType, _)) if c.getClass.isArray =>
+      c.asInstanceOf[Array[_]].map(e => fromJava(e, elementType)): Seq[Any]
+    case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) => {
+      case (key, value) => (fromJava(key, keyType), fromJava(value, valueType))
+    }.toMap
+    case (c, StructType(fields)) if c.getClass.isArray =>
+      new GenericInternalRow(c.asInstanceOf[Array[_]].zip(fields).map {
+        case (e, f) => fromJava(e, f.dataType)
+      })
+    case (c: java.util.Calendar, DateType) =>
+      DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
+    case (c: java.util.Calendar, TimestampType) =>
+      c.getTimeInMillis * 10000L
+    case (t: java.sql.Timestamp, TimestampType) =>
+      DateTimeUtils.fromJavaTimestamp(t)
+    case (_, udt: UserDefinedType[_]) =>
+      fromJava(obj, udt.sqlType)
+    case (c: Int, ByteType) => c.toByte
+    case (c: Long, ByteType) => c.toByte
+    case (c: Int, ShortType) => c.toShort
+    case (c: Long, ShortType) => c.toShort
+    case (c: Long, IntegerType) => c.toInt
+    case (c: Int, LongType) => c.toLong
+    case (c: Double, FloatType) => c.toFloat
+    case (c: String, StringType) => UTF8String.fromString(c)
+    case (c, StringType) =>
+      // If we get here, c is not a string. Call toString on it.
+      UTF8String.fromString(c.toString)
+    case (c, _) => c
+  }
+ * :: DeveloperApi ::
+ * Evaluates a [[PythonUDF]], appending the result to the end of the input 
+ */
+case class EvaluatePython(
+    udf: PythonUDF,
+    child: LogicalPlan,
+    resultAttribute: AttributeReference)
+  extends logical.UnaryNode {
+  def output: Seq[Attribute] = child.output :+ resultAttribute
+  // References should not include the produced attribute.
+  override def references: AttributeSet = udf.references
+ * :: DeveloperApi ::
+ * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a 
+ * The input data is zipped with the result of the udf evaluation.
+ */
+case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], 
child: SparkPlan)
+  extends SparkPlan {
+  def children: Seq[SparkPlan] = child :: Nil
+  protected override def doExecute(): RDD[InternalRow] = {
+    val childResults = child.execute().map(_.copy())
+    val parent = childResults.mapPartitions { iter =>
+      val pickle = new Pickler
+      val currentRow = newMutableProjection(udf.children, child.output)()
+      val fields =
+      iter.grouped(1000).map { inputRows =>
+        val toBePickled = { row =>
+          EvaluatePython.rowToArray(currentRow(row), fields)
+        }.toArray
+        pickle.dumps(toBePickled)
+      }
+    }
+    val pyRDD = new PythonRDD(
+      parent,
+      udf.command,
+      udf.envVars,
+      udf.pythonIncludes,
+      false,
+      udf.pythonExec,
+      udf.pythonVer,
+      udf.broadcastVars,
+      udf.accumulator
+    ).mapPartitions { iter =>
+      val pickle = new Unpickler
+      iter.flatMap { pickedResult =>
+        val unpickledBatch = pickle.loads(pickedResult)
+        unpickledBatch.asInstanceOf[java.util.ArrayList[Any]]
+      }
+    }.mapPartitions { iter =>
+      val row = new GenericMutableRow(1)
+ { result =>
+        row(0) = EvaluatePython.fromJava(result, udf.dataType)
+        row: InternalRow
+      }
+    }
+ { iter =>
+      val joinedRow = new JoinedRow()
+ {
+        case (row, udfResult) =>
+          joinedRow(row, udfResult)
+      }
+    }
+  }
diff --git 
index ea325cc..7978fda 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -391,7 +391,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
    * Records the UDFs present when the server starts, so we can delete ones 
that are created by
    * tests.
-  protected val originalUdfs: JavaSet[String] = 
+  protected val originalUDFs: JavaSet[String] = 
    * Resets the test instance by deleting any tables that have been created.
@@ -410,7 +410,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { 
udfName =>
FunctionRegistry.getFunctionNames.filterNot(originalUDFs.contains(_)).foreach { 
udfName =>
diff --git a/sql/hive/src/test/resources/data/files/testUDF/part-00000 
new file mode 100755
index 0000000..240a5c1
Binary files /dev/null and 
b/sql/hive/src/test/resources/data/files/testUDF/part-00000 differ
diff --git a/sql/hive/src/test/resources/data/files/testUdf/part-00000 
deleted file mode 100755
index 240a5c1..0000000
Binary files a/sql/hive/src/test/resources/data/files/testUdf/part-00000 and 
/dev/null differ

