This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d441dc  [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an 
iterator of pd.Series or an iterator of tuple of pd.Series
6d441dc is described below

commit 6d441dcdc68dae886e375794a55658f70cd18d9d
Author: WeichenXu <weichen...@databricks.com>
AuthorDate: Sat Jun 15 08:29:20 2019 -0700

    [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of 
pd.Series or an iterator of tuple of pd.Series
    
    ## What changes were proposed in this pull request?
    
    Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple 
of pd.Series.
    Note the UDF input args will be always one iterator:
    * if the udf take only column as input, the iterator's element will be 
pd.Series (corresponding to the column values batch)
    * if the udf take multiple columns as inputs, the iterator's element will 
be a tuple composed of multiple `pd.Series`s, each one corresponding to the 
multiple columns as inputs (keep the same order). For example:
    ```
    pandas_udf("int", PandasUDFType.SCALAR_ITER)
    def the_udf(iterator):
        for col1_batch, col2_batch in iterator:
            yield col1_batch + col2_batch
    
    df.select(the_udf("col1", "col2"))
    ```
    The udf above will add col1 and col2.
    
    I haven't add unit tests, but manually tests show it works fine. So it is 
ready for first pass review.
    We can test several typical cases:
    
    ```
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.functions import udf
    from pyspark.taskcontext import TaskContext
    
    df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"])
    
    pandas_udf("int", PandasUDFType.SCALAR_ITER)
    def fi1(it):
        pid = TaskContext.get().partitionId()
        print("DBG: fi1: do init stuff, partitionId=" + str(pid))
        for batch in it:
            yield batch + 100
        print("DBG: fi1: do close stuff, partitionId=" + str(pid))
    
    pandas_udf("int", PandasUDFType.SCALAR_ITER)
    def fi2(it):
        pid = TaskContext.get().partitionId()
        print("DBG: fi2: do init stuff, partitionId=" + str(pid))
        for batch in it:
            yield batch + 10000
        print("DBG: fi2: do close stuff, partitionId=" + str(pid))
    
    pandas_udf("int", PandasUDFType.SCALAR_ITER)
    def fi3(it):
        pid = TaskContext.get().partitionId()
        print("DBG: fi3: do init stuff, partitionId=" + str(pid))
        for x, y in it:
            yield x + y * 10 + 100000
        print("DBG: fi3: do close stuff, partitionId=" + str(pid))
    
    pandas_udf("int", PandasUDFType.SCALAR)
    def fp1(x):
        return x + 1000
    
    udf("int")
    def fu1(x):
        return x + 10
    
    # test select "pandas iter udf/pandas udf/sql udf" expressions at the same 
time.
    # Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only 
one plan,
    # and `fu1("a")`, `fp1("a")` will generate another two separate plans.
    df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show()
    
    # test chain two pandas iter udf together
    # Note this case `fi2(fi1("a"))` will generate only one plan
    # Also note the init stuff/close stuff call order will be like:
    # (debug output following)
    #     DBG: fi2: do init stuff, partitionId=0
    #     DBG: fi1: do init stuff, partitionId=0
    #     DBG: fi1: do close stuff, partitionId=0
    #     DBG: fi2: do close stuff, partitionId=0
    df.select(fi2(fi1("a"))).show()
    
    # test more complex chain
    # Note this case `fi1("a"), fi2("a")` will generate one plan,
    # and `fi3(fi1_output, fi2_output)` will generate another plan
    df.select(fi3(fi1("a"), fi2("a"))).show()
    ```
    
    ## How was this patch tested?
    
    To be added.
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24643 from WeichenXu123/pandas_udf_iter.
    
    Lead-authored-by: WeichenXu <weichen...@databricks.com>
    Co-authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 .../org/apache/spark/api/python/PythonRunner.scala |   2 +
 python/pyspark/rdd.py                              |   1 +
 python/pyspark/sql/functions.py                    |   3 +
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 882 ++++++++++++++-------
 python/pyspark/sql/udf.py                          |  13 +-
 python/pyspark/worker.py                           |  93 ++-
 .../spark/sql/catalyst/expressions/PythonUDF.scala |   3 +-
 .../plans/logical/pythonLogicalOperators.scala     |   3 +-
 .../spark/sql/execution/SparkStrategies.scala      |   4 +-
 .../sql/execution/python/ArrowEvalPythonExec.scala |   5 +-
 .../sql/execution/python/ExtractPythonUDFs.scala   |  44 +-
 11 files changed, 703 insertions(+), 350 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 7b02545..e0e35df 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -46,6 +46,7 @@ private[spark] object PythonEvalType {
   val SQL_GROUPED_MAP_PANDAS_UDF = 201
   val SQL_GROUPED_AGG_PANDAS_UDF = 202
   val SQL_WINDOW_AGG_PANDAS_UDF = 203
+  val SQL_SCALAR_PANDAS_ITER_UDF = 204
 
   def toString(pythonEvalType: Int): String = pythonEvalType match {
     case NON_UDF => "NON_UDF"
@@ -54,6 +55,7 @@ private[spark] object PythonEvalType {
     case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
     case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
     case SQL_WINDOW_AGG_PANDAS_UDF => "SQL_WINDOW_AGG_PANDAS_UDF"
+    case SQL_SCALAR_PANDAS_ITER_UDF => "SQL_SCALAR_PANDAS_ITER_UDF"
   }
 }
 
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f0682e7..395abc8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -73,6 +73,7 @@ class PythonEvalType(object):
     SQL_GROUPED_MAP_PANDAS_UDF = 201
     SQL_GROUPED_AGG_PANDAS_UDF = 202
     SQL_WINDOW_AGG_PANDAS_UDF = 203
+    SQL_SCALAR_PANDAS_ITER_UDF = 204
 
 
 def portable_hash(x):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 613822b..6973063 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2796,6 +2796,8 @@ class PandasUDFType(object):
     """
     SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF
 
+    SCALAR_ITER = PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
+
     GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
 
     GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
@@ -3178,6 +3180,7 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
         raise ValueError("Invalid returnType: returnType can not be None")
 
     if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                         PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
                          PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
                          PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
         raise ValueError("Invalid functionType: "
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index b219624..d2145b8 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -29,9 +29,11 @@ if sys.version >= '3':
 from datetime import date, datetime
 from decimal import Decimal
 
+from pyspark import TaskContext
 from pyspark.rdd import PythonEvalType
 from pyspark.sql import Column
-from pyspark.sql.functions import array, col, expr, lit, sum, struct, udf, 
pandas_udf
+from pyspark.sql.functions import array, col, expr, lit, sum, struct, udf, 
pandas_udf, \
+    PandasUDFType
 from pyspark.sql.types import Row
 from pyspark.sql.types import *
 from pyspark.sql.utils import AnalysisException
@@ -83,6 +85,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         random_udf = random_udf.asNondeterministic()
         return random_udf
 
+    @property
+    def nondeterministic_vectorized_iter_udf(self):
+        import numpy as np
+
+        @pandas_udf('double', PandasUDFType.SCALAR_ITER)
+        def random_udf(it):
+            for v in it:
+                yield pd.Series(np.random.random(len(v)))
+
+        random_udf = random_udf.asNondeterministic()
+        return random_udf
+
     def test_pandas_udf_tokenize(self):
         tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')),
                               ArrayType(StringType()))
@@ -110,19 +124,20 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             col('id').cast('boolean').alias('bool'),
             array(col('id')).alias('array_long'))
         f = lambda x: x
-        str_f = pandas_udf(f, StringType())
-        int_f = pandas_udf(f, IntegerType())
-        long_f = pandas_udf(f, LongType())
-        float_f = pandas_udf(f, FloatType())
-        double_f = pandas_udf(f, DoubleType())
-        decimal_f = pandas_udf(f, DecimalType())
-        bool_f = pandas_udf(f, BooleanType())
-        array_long_f = pandas_udf(f, ArrayType(LongType()))
-        res = df.select(str_f(col('str')), int_f(col('int')),
-                        long_f(col('long')), float_f(col('float')),
-                        double_f(col('double')), decimal_f('decimal'),
-                        bool_f(col('bool')), array_long_f('array_long'))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            str_f = pandas_udf(f, StringType(), udf_type)
+            int_f = pandas_udf(f, IntegerType(), udf_type)
+            long_f = pandas_udf(f, LongType(), udf_type)
+            float_f = pandas_udf(f, FloatType(), udf_type)
+            double_f = pandas_udf(f, DoubleType(), udf_type)
+            decimal_f = pandas_udf(f, DecimalType(), udf_type)
+            bool_f = pandas_udf(f, BooleanType(), udf_type)
+            array_long_f = pandas_udf(f, ArrayType(LongType()), udf_type)
+            res = df.select(str_f(col('str')), int_f(col('int')),
+                            long_f(col('long')), float_f(col('float')),
+                            double_f(col('double')), decimal_f('decimal'),
+                            bool_f(col('bool')), array_long_f('array_long'))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_register_nondeterministic_vectorized_udf_basic(self):
         random_pandas_udf = pandas_udf(
@@ -136,84 +151,115 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
         self.assertEqual(row[0], 7)
 
+        def random_iter_udf(it):
+            for i in it:
+                yield random.randint(6, 6) + i
+        random_pandas_iter_udf = pandas_udf(
+            random_iter_udf, IntegerType(), 
PandasUDFType.SCALAR_ITER).asNondeterministic()
+        self.assertEqual(random_pandas_iter_udf.deterministic, False)
+        self.assertEqual(random_pandas_iter_udf.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
+        nondeterministic_pandas_iter_udf = self.spark.catalog.registerFunction(
+            "randomPandasIterUDF", random_pandas_iter_udf)
+        self.assertEqual(nondeterministic_pandas_iter_udf.deterministic, False)
+        self.assertEqual(nondeterministic_pandas_iter_udf.evalType,
+                         PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
+        [row] = self.spark.sql("SELECT randomPandasIterUDF(1)").collect()
+        self.assertEqual(row[0], 7)
+
     def test_vectorized_udf_null_boolean(self):
         data = [(True,), (True,), (None,), (False,)]
         schema = StructType().add("bool", BooleanType())
         df = self.spark.createDataFrame(data, schema)
-        bool_f = pandas_udf(lambda x: x, BooleanType())
-        res = df.select(bool_f(col('bool')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            bool_f = pandas_udf(lambda x: x, BooleanType(), udf_type)
+            res = df.select(bool_f(col('bool')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_byte(self):
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("byte", ByteType())
         df = self.spark.createDataFrame(data, schema)
-        byte_f = pandas_udf(lambda x: x, ByteType())
-        res = df.select(byte_f(col('byte')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            byte_f = pandas_udf(lambda x: x, ByteType(), udf_type)
+            res = df.select(byte_f(col('byte')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_short(self):
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("short", ShortType())
         df = self.spark.createDataFrame(data, schema)
-        short_f = pandas_udf(lambda x: x, ShortType())
-        res = df.select(short_f(col('short')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            short_f = pandas_udf(lambda x: x, ShortType(), udf_type)
+            res = df.select(short_f(col('short')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_int(self):
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("int", IntegerType())
         df = self.spark.createDataFrame(data, schema)
-        int_f = pandas_udf(lambda x: x, IntegerType())
-        res = df.select(int_f(col('int')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            int_f = pandas_udf(lambda x: x, IntegerType(), udf_type)
+            res = df.select(int_f(col('int')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_long(self):
         data = [(None,), (2,), (3,), (4,)]
         schema = StructType().add("long", LongType())
         df = self.spark.createDataFrame(data, schema)
-        long_f = pandas_udf(lambda x: x, LongType())
-        res = df.select(long_f(col('long')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            long_f = pandas_udf(lambda x: x, LongType(), udf_type)
+            res = df.select(long_f(col('long')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_float(self):
         data = [(3.0,), (5.0,), (-1.0,), (None,)]
         schema = StructType().add("float", FloatType())
         df = self.spark.createDataFrame(data, schema)
-        float_f = pandas_udf(lambda x: x, FloatType())
-        res = df.select(float_f(col('float')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            float_f = pandas_udf(lambda x: x, FloatType(), udf_type)
+            res = df.select(float_f(col('float')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_double(self):
         data = [(3.0,), (5.0,), (-1.0,), (None,)]
         schema = StructType().add("double", DoubleType())
         df = self.spark.createDataFrame(data, schema)
-        double_f = pandas_udf(lambda x: x, DoubleType())
-        res = df.select(double_f(col('double')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            double_f = pandas_udf(lambda x: x, DoubleType(), udf_type)
+            res = df.select(double_f(col('double')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_decimal(self):
         data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)]
         schema = StructType().add("decimal", DecimalType(38, 18))
         df = self.spark.createDataFrame(data, schema)
-        decimal_f = pandas_udf(lambda x: x, DecimalType(38, 18))
-        res = df.select(decimal_f(col('decimal')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            decimal_f = pandas_udf(lambda x: x, DecimalType(38, 18), udf_type)
+            res = df.select(decimal_f(col('decimal')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_string(self):
         data = [("foo",), (None,), ("bar",), ("bar",)]
         schema = StructType().add("str", StringType())
         df = self.spark.createDataFrame(data, schema)
-        str_f = pandas_udf(lambda x: x, StringType())
-        res = df.select(str_f(col('str')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            str_f = pandas_udf(lambda x: x, StringType(), udf_type)
+            res = df.select(str_f(col('str')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_string_in_udf(self):
         df = self.spark.range(10)
-        str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType())
-        actual = df.select(str_f(col('id')))
-        expected = df.select(col('id').cast('string'))
-        self.assertEquals(expected.collect(), actual.collect())
+        scalar_f = lambda x: pd.Series(map(str, x))
+
+        def iter_f(it):
+            for i in it:
+                yield scalar_f(i)
+
+        for f, udf_type in [(scalar_f, PandasUDFType.SCALAR), (iter_f, 
PandasUDFType.SCALAR_ITER)]:
+            str_f = pandas_udf(f, StringType(), udf_type)
+            actual = df.select(str_f(col('id')))
+            expected = df.select(col('id').cast('string'))
+            self.assertEquals(expected.collect(), actual.collect())
 
     def test_vectorized_udf_datatype_string(self):
         df = self.spark.range(10).select(
@@ -225,42 +271,46 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             col('id').cast('decimal').alias('decimal'),
             col('id').cast('boolean').alias('bool'))
         f = lambda x: x
-        str_f = pandas_udf(f, 'string')
-        int_f = pandas_udf(f, 'integer')
-        long_f = pandas_udf(f, 'long')
-        float_f = pandas_udf(f, 'float')
-        double_f = pandas_udf(f, 'double')
-        decimal_f = pandas_udf(f, 'decimal(38, 18)')
-        bool_f = pandas_udf(f, 'boolean')
-        res = df.select(str_f(col('str')), int_f(col('int')),
-                        long_f(col('long')), float_f(col('float')),
-                        double_f(col('double')), decimal_f('decimal'),
-                        bool_f(col('bool')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            str_f = pandas_udf(f, 'string', udf_type)
+            int_f = pandas_udf(f, 'integer', udf_type)
+            long_f = pandas_udf(f, 'long', udf_type)
+            float_f = pandas_udf(f, 'float', udf_type)
+            double_f = pandas_udf(f, 'double', udf_type)
+            decimal_f = pandas_udf(f, 'decimal(38, 18)', udf_type)
+            bool_f = pandas_udf(f, 'boolean', udf_type)
+            res = df.select(str_f(col('str')), int_f(col('int')),
+                            long_f(col('long')), float_f(col('float')),
+                            double_f(col('double')), decimal_f('decimal'),
+                            bool_f(col('bool')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_null_binary(self):
         data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), 
(bytearray(b"ccc"),)]
         schema = StructType().add("binary", BinaryType())
         df = self.spark.createDataFrame(data, schema)
-        str_f = pandas_udf(lambda x: x, BinaryType())
-        res = df.select(str_f(col('binary')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            str_f = pandas_udf(lambda x: x, BinaryType(), udf_type)
+            res = df.select(str_f(col('binary')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_array_type(self):
         data = [([1, 2],), ([3, 4],)]
         array_schema = StructType([StructField("array", 
ArrayType(IntegerType()))])
         df = self.spark.createDataFrame(data, schema=array_schema)
-        array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()))
-        result = df.select(array_f(col('array')))
-        self.assertEquals(df.collect(), result.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()), 
udf_type)
+            result = df.select(array_f(col('array')))
+            self.assertEquals(df.collect(), result.collect())
 
     def test_vectorized_udf_null_array(self):
         data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)]
         array_schema = StructType([StructField("array", 
ArrayType(IntegerType()))])
         df = self.spark.createDataFrame(data, schema=array_schema)
-        array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()))
-        result = df.select(array_f(col('array')))
-        self.assertEquals(df.collect(), result.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            array_f = pandas_udf(lambda x: x, ArrayType(IntegerType()), 
udf_type)
+            result = df.select(array_f(col('array')))
+            self.assertEquals(df.collect(), result.collect())
 
     def test_vectorized_udf_struct_type(self):
         df = self.spark.range(10)
@@ -268,24 +318,30 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             StructField('id', LongType()),
             StructField('str', StringType())])
 
-        def func(id):
+        def scalar_func(id):
             return pd.DataFrame({'id': id, 'str': id.apply(unicode)})
 
-        f = pandas_udf(func, returnType=return_type)
+        def iter_func(it):
+            for id in it:
+                yield scalar_func(id)
 
-        expected = df.select(struct(col('id'), 
col('id').cast('string').alias('str'))
-                             .alias('struct')).collect()
+        for func, udf_type in [(scalar_func, PandasUDFType.SCALAR),
+                               (iter_func, PandasUDFType.SCALAR_ITER)]:
+            f = pandas_udf(func, returnType=return_type, functionType=udf_type)
 
-        actual = df.select(f(col('id')).alias('struct')).collect()
-        self.assertEqual(expected, actual)
+            expected = df.select(struct(col('id'), 
col('id').cast('string').alias('str'))
+                                 .alias('struct')).collect()
 
-        g = pandas_udf(func, 'id: long, str: string')
-        actual = df.select(g(col('id')).alias('struct')).collect()
-        self.assertEqual(expected, actual)
+            actual = df.select(f(col('id')).alias('struct')).collect()
+            self.assertEqual(expected, actual)
 
-        struct_f = pandas_udf(lambda x: x, return_type)
-        actual = df.select(struct_f(struct(col('id'), 
col('id').cast('string').alias('str'))))
-        self.assertEqual(expected, actual.collect())
+            g = pandas_udf(func, 'id: long, str: string', 
functionType=udf_type)
+            actual = df.select(g(col('id')).alias('struct')).collect()
+            self.assertEqual(expected, actual)
+
+            struct_f = pandas_udf(lambda x: x, return_type, 
functionType=udf_type)
+            actual = df.select(struct_f(struct(col('id'), 
col('id').cast('string').alias('str'))))
+            self.assertEqual(expected, actual.collect())
 
     def test_vectorized_udf_struct_complex(self):
         df = self.spark.range(10)
@@ -293,17 +349,24 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             StructField('ts', TimestampType()),
             StructField('arr', ArrayType(LongType()))])
 
-        @pandas_udf(returnType=return_type)
-        def f(id):
+        def _scalar_f(id):
             return pd.DataFrame({'ts': id.apply(lambda i: pd.Timestamp(i)),
                                  'arr': id.apply(lambda i: [i, i + 1])})
 
-        actual = df.withColumn('f', f(col('id'))).collect()
-        for i, row in enumerate(actual):
-            id, f = row
-            self.assertEqual(i, id)
-            self.assertEqual(pd.Timestamp(i).to_pydatetime(), f[0])
-            self.assertListEqual([i, i + 1], f[1])
+        scalar_f = pandas_udf(_scalar_f, returnType=return_type)
+
+        @pandas_udf(returnType=return_type, 
functionType=PandasUDFType.SCALAR_ITER)
+        def iter_f(it):
+            for id in it:
+                yield _scalar_f(id)
+
+        for f, udf_type in [(scalar_f, PandasUDFType.SCALAR), (iter_f, 
PandasUDFType.SCALAR_ITER)]:
+            actual = df.withColumn('f', f(col('id'))).collect()
+            for i, row in enumerate(actual):
+                id, f = row
+                self.assertEqual(i, id)
+                self.assertEqual(pd.Timestamp(i).to_pydatetime(), f[0])
+                self.assertListEqual([i, i + 1], f[1])
 
     def test_vectorized_udf_nested_struct(self):
         nested_type = StructType([
@@ -314,30 +377,56 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             ]))
         ])
 
-        with QuietTest(self.sc):
-            with self.assertRaisesRegexp(
-                    Exception,
-                    'Invalid returnType with scalar Pandas UDFs'):
-                pandas_udf(lambda x: x, returnType=nested_type)
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(
+                        Exception,
+                        'Invalid returnType with scalar Pandas UDFs'):
+                    pandas_udf(lambda x: x, returnType=nested_type, 
functionType=udf_type)
 
     def test_vectorized_udf_complex(self):
         df = self.spark.range(10).select(
             col('id').cast('int').alias('a'),
             col('id').cast('int').alias('b'),
             col('id').cast('double').alias('c'))
-        add = pandas_udf(lambda x, y: x + y, IntegerType())
-        power2 = pandas_udf(lambda x: 2 ** x, IntegerType())
-        mul = pandas_udf(lambda x, y: x * y, DoubleType())
-        res = df.select(add(col('a'), col('b')), power2(col('a')), 
mul(col('b'), col('c')))
-        expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * c'))
-        self.assertEquals(expected.collect(), res.collect())
+        scalar_add = pandas_udf(lambda x, y: x + y, IntegerType())
+        scalar_power2 = pandas_udf(lambda x: 2 ** x, IntegerType())
+        scalar_mul = pandas_udf(lambda x, y: x * y, DoubleType())
+
+        @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER)
+        def iter_add(it):
+            for x, y in it:
+                yield x + y
+
+        @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER)
+        def iter_power2(it):
+            for x in it:
+                yield 2 ** x
+
+        @pandas_udf(DoubleType(), PandasUDFType.SCALAR_ITER)
+        def iter_mul(it):
+            for x, y in it:
+                yield x * y
+
+        for add, power2, mul in [(scalar_add, scalar_power2, scalar_mul),
+                                 (iter_add, iter_power2, iter_mul)]:
+            res = df.select(add(col('a'), col('b')), power2(col('a')), 
mul(col('b'), col('c')))
+            expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * 
c'))
+            self.assertEquals(expected.collect(), res.collect())
 
     def test_vectorized_udf_exception(self):
         df = self.spark.range(10)
-        raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
-        with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'division( or modulo)? by 
zero'):
-                df.select(raise_exception(col('id'))).collect()
+        scalar_raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
+
+        @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
+        def iter_raise_exception(it):
+            for x in it:
+                yield x * (1 / 0)
+
+        for raise_exception in [scalar_raise_exception, iter_raise_exception]:
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(Exception, 'division( or modulo)? 
by zero'):
+                    df.select(raise_exception(col('id'))).collect()
 
     def test_vectorized_udf_invalid_length(self):
         df = self.spark.range(10)
@@ -348,12 +437,46 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                     'Result vector from pandas_udf was not the required 
length'):
                 df.select(raise_exception(col('id'))).collect()
 
+        @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
+        def iter_udf_wong_output_size(it):
+            for _ in it:
+                yield pd.Series(1)
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(
+                    Exception,
+                    "The number of output rows of pandas iterator UDF should 
be "
+                    "the same with input rows"):
+                df.select(iter_udf_wong_output_size(col('id'))).collect()
+
+        @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
+        def iter_udf_not_reading_all_input(it):
+            for batch in it:
+                batch_len = len(batch)
+                yield pd.Series([1] * batch_len)
+                break
+
+        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
3}):
+            df1 = self.spark.range(10).repartition(1)
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(
+                        Exception,
+                        "SQL_SCALAR_PANDAS_ITER_UDF should exhaust the input 
iterator"):
+                    
df1.select(iter_udf_not_reading_all_input(col('id'))).collect()
+
     def test_vectorized_udf_chained(self):
         df = self.spark.range(10)
-        f = pandas_udf(lambda x: x + 1, LongType())
-        g = pandas_udf(lambda x: x - 1, LongType())
-        res = df.select(g(f(col('id'))))
-        self.assertEquals(df.collect(), res.collect())
+        scalar_f = pandas_udf(lambda x: x + 1, LongType())
+        scalar_g = pandas_udf(lambda x: x - 1, LongType())
+
+        iter_f = pandas_udf(lambda it: map(lambda x: x + 1, it), LongType(),
+                            PandasUDFType.SCALAR_ITER)
+        iter_g = pandas_udf(lambda it: map(lambda x: x - 1, it), LongType(),
+                            PandasUDFType.SCALAR_ITER)
+
+        for f, g in [(scalar_f, scalar_g), (iter_f, iter_g)]:
+            res = df.select(g(f(col('id'))))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_chained_struct_type(self):
         df = self.spark.range(10)
@@ -362,76 +485,110 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             StructField('str', StringType())])
 
         @pandas_udf(return_type)
-        def f(id):
+        def scalar_f(id):
             return pd.DataFrame({'id': id, 'str': id.apply(unicode)})
 
-        g = pandas_udf(lambda x: x, return_type)
+        scalar_g = pandas_udf(lambda x: x, return_type)
+
+        @pandas_udf(return_type, PandasUDFType.SCALAR_ITER)
+        def iter_f(it):
+            for id in it:
+                yield pd.DataFrame({'id': id, 'str': id.apply(unicode)})
+
+        iter_g = pandas_udf(lambda x: x, return_type, 
PandasUDFType.SCALAR_ITER)
 
         expected = df.select(struct(col('id'), 
col('id').cast('string').alias('str'))
                              .alias('struct')).collect()
 
-        actual = df.select(g(f(col('id'))).alias('struct')).collect()
-        self.assertEqual(expected, actual)
+        for f, g in [(scalar_f, scalar_g), (iter_f, iter_g)]:
+            actual = df.select(g(f(col('id'))).alias('struct')).collect()
+            self.assertEqual(expected, actual)
 
     def test_vectorized_udf_wrong_return_type(self):
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(
-                    NotImplementedError,
-                    'Invalid returnType.*scalar Pandas UDF.*MapType'):
-                pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
+            for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+                with self.assertRaisesRegexp(
+                        NotImplementedError,
+                        'Invalid returnType.*scalar Pandas UDF.*MapType'):
+                    pandas_udf(lambda x: x, MapType(LongType(), LongType()), 
udf_type)
 
     def test_vectorized_udf_return_scalar(self):
         df = self.spark.range(10)
-        f = pandas_udf(lambda x: 1.0, DoubleType())
-        with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Return.*type.*Series'):
-                df.select(f(col('id'))).collect()
+        scalar_f = pandas_udf(lambda x: 1.0, DoubleType())
+        iter_f = pandas_udf(lambda it: map(lambda x: 1.0, it), DoubleType(),
+                            PandasUDFType.SCALAR_ITER)
+        for f in [scalar_f, iter_f]:
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(Exception, 
'Return.*type.*Series'):
+                    df.select(f(col('id'))).collect()
 
     def test_vectorized_udf_decorator(self):
         df = self.spark.range(10)
 
         @pandas_udf(returnType=LongType())
-        def identity(x):
+        def scalar_identity(x):
             return x
-        res = df.select(identity(col('id')))
-        self.assertEquals(df.collect(), res.collect())
+
+        @pandas_udf(returnType=LongType(), 
functionType=PandasUDFType.SCALAR_ITER)
+        def iter_identity(x):
+            return x
+
+        for identity in [scalar_identity, iter_identity]:
+            res = df.select(identity(col('id')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_empty_partition(self):
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
-        f = pandas_udf(lambda x: x, LongType())
-        res = df.select(f(col('id')))
-        self.assertEquals(df.collect(), res.collect())
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            f = pandas_udf(lambda x: x, LongType(), udf_type)
+            res = df.select(f(col('id')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_struct_with_empty_partition(self):
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))\
             .withColumn('name', lit('John Doe'))
 
         @pandas_udf("first string, last string")
-        def split_expand(n):
+        def scalar_split_expand(n):
             return n.str.split(expand=True)
 
-        result = df.select(split_expand('name')).collect()
-        self.assertEqual(1, len(result))
-        row = result[0]
-        self.assertEqual('John', row[0]['first'])
-        self.assertEqual('Doe', row[0]['last'])
+        @pandas_udf("first string, last string", PandasUDFType.SCALAR_ITER)
+        def iter_split_expand(it):
+            for n in it:
+                yield n.str.split(expand=True)
+
+        for split_expand in [scalar_split_expand, iter_split_expand]:
+            result = df.select(split_expand('name')).collect()
+            self.assertEqual(1, len(result))
+            row = result[0]
+            self.assertEqual('John', row[0]['first'])
+            self.assertEqual('Doe', row[0]['last'])
 
     def test_vectorized_udf_varargs(self):
         df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
-        f = pandas_udf(lambda *v: v[0], LongType())
-        res = df.select(f(col('id')))
-        self.assertEquals(df.collect(), res.collect())
+        scalar_f = pandas_udf(lambda *v: v[0], LongType())
+
+        @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
+        def iter_f(it):
+            for v in it:
+                yield v[0]
+
+        for f in [scalar_f, iter_f]:
+            res = df.select(f(col('id'), col('id')))
+            self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_unsupported_types(self):
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(
-                    NotImplementedError,
-                    'Invalid returnType.*scalar Pandas UDF.*MapType'):
-                pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
-            with self.assertRaisesRegexp(
-                    NotImplementedError,
-                    'Invalid returnType.*scalar Pandas 
UDF.*ArrayType.StructType'):
-                pandas_udf(lambda x: x, ArrayType(StructType([StructField('a', 
IntegerType())])))
+            for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+                with self.assertRaisesRegexp(
+                        NotImplementedError,
+                        'Invalid returnType.*scalar Pandas UDF.*MapType'):
+                    pandas_udf(lambda x: x, MapType(StringType(), 
IntegerType()), udf_type)
+                with self.assertRaisesRegexp(
+                        NotImplementedError,
+                        'Invalid returnType.*scalar Pandas 
UDF.*ArrayType.StructType'):
+                    pandas_udf(lambda x: x,
+                               ArrayType(StructType([StructField('a', 
IntegerType())])), udf_type)
 
     def test_vectorized_udf_dates(self):
         schema = StructType().add("idx", LongType()).add("date", DateType())
@@ -442,11 +599,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 (4, date(2262, 4, 12),)]
         df = self.spark.createDataFrame(data, schema=schema)
 
-        date_copy = pandas_udf(lambda t: t, returnType=DateType())
-        df = df.withColumn("date_copy", date_copy(col("date")))
-
-        @pandas_udf(returnType=StringType())
-        def check_data(idx, date, date_copy):
+        def scalar_check_data(idx, date, date_copy):
             msgs = []
             is_equal = date.isnull()
             for i in range(len(idx)):
@@ -459,14 +612,26 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                         % (date[i], idx[i], data[idx[i]][1]))
             return pd.Series(msgs)
 
-        result = df.withColumn("check_data",
-                               check_data(col("idx"), col("date"), 
col("date_copy"))).collect()
+        def iter_check_data(it):
+            for idx, date, date_copy in it:
+                yield scalar_check_data(idx, date, date_copy)
 
-        self.assertEquals(len(data), len(result))
-        for i in range(len(result)):
-            self.assertEquals(data[i][1], result[i][1])  # "date" col
-            self.assertEquals(data[i][1], result[i][2])  # "date_copy" col
-            self.assertIsNone(result[i][3])  # "check_data" col
+        pandas_scalar_check_data = pandas_udf(scalar_check_data, StringType())
+        pandas_iter_check_data = pandas_udf(iter_check_data, StringType(),
+                                            PandasUDFType.SCALAR_ITER)
+
+        for check_data, udf_type in [(pandas_scalar_check_data, 
PandasUDFType.SCALAR),
+                                     (pandas_iter_check_data, 
PandasUDFType.SCALAR_ITER)]:
+            date_copy = pandas_udf(lambda t: t, returnType=DateType(), 
functionType=udf_type)
+            df = df.withColumn("date_copy", date_copy(col("date")))
+            result = df.withColumn("check_data",
+                                   check_data(col("idx"), col("date"), 
col("date_copy"))).collect()
+
+            self.assertEquals(len(data), len(result))
+            for i in range(len(result)):
+                self.assertEquals(data[i][1], result[i][1])  # "date" col
+                self.assertEquals(data[i][1], result[i][2])  # "date_copy" col
+                self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_timestamps(self):
         schema = StructType([
@@ -479,12 +644,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 
         df = self.spark.createDataFrame(data, schema=schema)
 
-        # Check that a timestamp passed through a pandas_udf will not be 
altered by timezone calc
-        f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType())
-        df = df.withColumn("timestamp_copy", 
f_timestamp_copy(col("timestamp")))
-
-        @pandas_udf(returnType=StringType())
-        def check_data(idx, timestamp, timestamp_copy):
+        def scalar_check_data(idx, timestamp, timestamp_copy):
             msgs = []
             is_equal = timestamp.isnull()  # use this array to check values 
are equal
             for i in range(len(idx)):
@@ -498,42 +658,71 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                         % (timestamp[i], idx[i], data[idx[i]][1]))
             return pd.Series(msgs)
 
-        result = df.withColumn("check_data", check_data(col("idx"), 
col("timestamp"),
-                                                        
col("timestamp_copy"))).collect()
-        # Check that collection values are correct
-        self.assertEquals(len(data), len(result))
-        for i in range(len(result)):
-            self.assertEquals(data[i][1], result[i][1])  # "timestamp" col
-            self.assertEquals(data[i][1], result[i][2])  # "timestamp_copy" col
-            self.assertIsNone(result[i][3])  # "check_data" col
+        def iter_check_data(it):
+            for idx, timestamp, timestamp_copy in it:
+                yield scalar_check_data(idx, timestamp, timestamp_copy)
+
+        pandas_scalar_check_data = pandas_udf(scalar_check_data, StringType())
+        pandas_iter_check_data = pandas_udf(iter_check_data, StringType(),
+                                            PandasUDFType.SCALAR_ITER)
+
+        for check_data, udf_type in [(pandas_scalar_check_data, 
PandasUDFType.SCALAR),
+                                     (pandas_iter_check_data, 
PandasUDFType.SCALAR_ITER)]:
+            # Check that a timestamp passed through a pandas_udf will not be 
altered by timezone
+            # calc
+            f_timestamp_copy = pandas_udf(lambda t: t,
+                                          returnType=TimestampType(), 
functionType=udf_type)
+            df = df.withColumn("timestamp_copy", 
f_timestamp_copy(col("timestamp")))
+            result = df.withColumn("check_data", check_data(col("idx"), 
col("timestamp"),
+                                                            
col("timestamp_copy"))).collect()
+            # Check that collection values are correct
+            self.assertEquals(len(data), len(result))
+            for i in range(len(result)):
+                self.assertEquals(data[i][1], result[i][1])  # "timestamp" col
+                self.assertEquals(data[i][1], result[i][2])  # 
"timestamp_copy" col
+                self.assertIsNone(result[i][3])  # "check_data" col
 
     def test_vectorized_udf_return_timestamp_tz(self):
         df = self.spark.range(10)
 
         @pandas_udf(returnType=TimestampType())
-        def gen_timestamps(id):
+        def scalar_gen_timestamps(id):
             ts = [pd.Timestamp(i, unit='D', tz='America/Los_Angeles') for i in 
id]
             return pd.Series(ts)
 
-        result = df.withColumn("ts", gen_timestamps(col("id"))).collect()
-        spark_ts_t = TimestampType()
-        for r in result:
-            i, ts = r
-            ts_tz = pd.Timestamp(i, unit='D', 
tz='America/Los_Angeles').to_pydatetime()
-            expected = spark_ts_t.fromInternal(spark_ts_t.toInternal(ts_tz))
-            self.assertEquals(expected, ts)
+        @pandas_udf(returnType=TimestampType(), 
functionType=PandasUDFType.SCALAR_ITER)
+        def iter_gen_timestamps(it):
+            for id in it:
+                ts = [pd.Timestamp(i, unit='D', tz='America/Los_Angeles') for 
i in id]
+                yield pd.Series(ts)
+
+        for gen_timestamps in [scalar_gen_timestamps, iter_gen_timestamps]:
+            result = df.withColumn("ts", gen_timestamps(col("id"))).collect()
+            spark_ts_t = TimestampType()
+            for r in result:
+                i, ts = r
+                ts_tz = pd.Timestamp(i, unit='D', 
tz='America/Los_Angeles').to_pydatetime()
+                expected = 
spark_ts_t.fromInternal(spark_ts_t.toInternal(ts_tz))
+                self.assertEquals(expected, ts)
 
     def test_vectorized_udf_check_config(self):
         with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
3}):
             df = self.spark.range(10, numPartitions=1)
 
             @pandas_udf(returnType=LongType())
-            def check_records_per_batch(x):
+            def scalar_check_records_per_batch(x):
                 return pd.Series(x.size).repeat(x.size)
 
-            result = df.select(check_records_per_batch(col("id"))).collect()
-            for (r,) in result:
-                self.assertTrue(r <= 3)
+            @pandas_udf(returnType=LongType(), 
functionType=PandasUDFType.SCALAR_ITER)
+            def iter_check_records_per_batch(it):
+                for x in it:
+                    yield pd.Series(x.size).repeat(x.size)
+
+            for check_records_per_batch in [scalar_check_records_per_batch,
+                                            iter_check_records_per_batch]:
+                result = 
df.select(check_records_per_batch(col("id"))).collect()
+                for (r,) in result:
+                    self.assertTrue(r <= 3)
 
     def test_vectorized_udf_timestamps_respect_session_timezone(self):
         schema = StructType([
@@ -545,69 +734,121 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
                 (4, datetime(2100, 3, 3, 3, 3, 3))]
         df = self.spark.createDataFrame(data, schema=schema)
 
-        f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
-        internal_value = pandas_udf(
+        scalar_internal_value = pandas_udf(
             lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else 
None), LongType())
 
-        timezone = "America/New_York"
-        with self.sql_conf({
-                "spark.sql.execution.pandas.respectSessionTimeZone": False,
-                "spark.sql.session.timeZone": timezone}):
-            df_la = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
-                .withColumn("internal_value", internal_value(col("timestamp")))
-            result_la = df_la.select(col("idx"), 
col("internal_value")).collect()
-            # Correct result_la by adjusting 3 hours difference between Los 
Angeles and New York
-            diff = 3 * 60 * 60 * 1000 * 1000 * 1000
-            result_la_corrected = \
-                df_la.select(col("idx"), col("tscopy"), col("internal_value") 
+ diff).collect()
-
-        with self.sql_conf({
-                "spark.sql.execution.pandas.respectSessionTimeZone": True,
-                "spark.sql.session.timeZone": timezone}):
-            df_ny = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
-                .withColumn("internal_value", internal_value(col("timestamp")))
-            result_ny = df_ny.select(col("idx"), col("tscopy"), 
col("internal_value")).collect()
-
-            self.assertNotEqual(result_ny, result_la)
-            self.assertEqual(result_ny, result_la_corrected)
+        @pandas_udf(LongType(), PandasUDFType.SCALAR_ITER)
+        def iter_internal_value(it):
+            for ts in it:
+                yield ts.apply(lambda ts: ts.value if ts is not pd.NaT else 
None)
+
+        for internal_value, udf_type in [(scalar_internal_value, 
PandasUDFType.SCALAR),
+                                         (iter_internal_value, 
PandasUDFType.SCALAR_ITER)]:
+            f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType(), 
udf_type)
+            timezone = "America/New_York"
+            with self.sql_conf({
+                    "spark.sql.execution.pandas.respectSessionTimeZone": False,
+                    "spark.sql.session.timeZone": timezone}):
+                df_la = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
+                    .withColumn("internal_value", 
internal_value(col("timestamp")))
+                result_la = df_la.select(col("idx"), 
col("internal_value")).collect()
+                # Correct result_la by adjusting 3 hours difference between 
Los Angeles and New York
+                diff = 3 * 60 * 60 * 1000 * 1000 * 1000
+                result_la_corrected = \
+                    df_la.select(col("idx"), col("tscopy"), 
col("internal_value") + diff).collect()
+
+            with self.sql_conf({
+                    "spark.sql.execution.pandas.respectSessionTimeZone": True,
+                    "spark.sql.session.timeZone": timezone}):
+                df_ny = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
+                    .withColumn("internal_value", 
internal_value(col("timestamp")))
+                result_ny = df_ny.select(col("idx"), col("tscopy"), 
col("internal_value")).collect()
+
+                self.assertNotEqual(result_ny, result_la)
+                self.assertEqual(result_ny, result_la_corrected)
 
     def test_nondeterministic_vectorized_udf(self):
         # Test that nondeterministic UDFs are evaluated only once in chained 
UDF evaluations
         @pandas_udf('double')
-        def plus_ten(v):
+        def scalar_plus_ten(v):
             return v + 10
-        random_udf = self.nondeterministic_vectorized_udf
 
-        df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
-        result1 = df.withColumn('plus_ten(rand)', 
plus_ten(df['rand'])).toPandas()
+        @pandas_udf('double', PandasUDFType.SCALAR_ITER)
+        def iter_plus_ten(it):
+            for v in it:
+                yield v + 10
 
-        self.assertEqual(random_udf.deterministic, False)
-        self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
+        for plus_ten in [scalar_plus_ten, iter_plus_ten]:
+            random_udf = self.nondeterministic_vectorized_udf
+
+            df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
+            result1 = df.withColumn('plus_ten(rand)', 
plus_ten(df['rand'])).toPandas()
+
+            self.assertEqual(random_udf.deterministic, False)
+            self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 
10))
 
     def test_nondeterministic_vectorized_udf_in_aggregate(self):
         df = self.spark.range(10)
-        random_udf = self.nondeterministic_vectorized_udf
-
-        with QuietTest(self.sc):
-            with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
-                df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
-            with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
-                df.agg(sum(random_udf(df.id))).collect()
+        for random_udf in [self.nondeterministic_vectorized_udf,
+                           self.nondeterministic_vectorized_iter_udf]:
+            with QuietTest(self.sc):
+                with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
+                    df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
+                with self.assertRaisesRegexp(AnalysisException, 
'nondeterministic'):
+                    df.agg(sum(random_udf(df.id))).collect()
 
     def test_register_vectorized_udf_basic(self):
         df = self.spark.range(10).select(
             col('id').cast('int').alias('a'),
             col('id').cast('int').alias('b'))
-        original_add = pandas_udf(lambda x, y: x + y, IntegerType())
-        self.assertEqual(original_add.deterministic, True)
-        self.assertEqual(original_add.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_UDF)
-        new_add = self.spark.catalog.registerFunction("add1", original_add)
-        res1 = df.select(new_add(col('a'), col('b')))
-        res2 = self.spark.sql(
-            "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM 
range(10)) t")
-        expected = df.select(expr('a + b'))
-        self.assertEquals(expected.collect(), res1.collect())
-        self.assertEquals(expected.collect(), res2.collect())
+        scalar_original_add = pandas_udf(lambda x, y: x + y, IntegerType())
+        self.assertEqual(scalar_original_add.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_UDF)
+
+        @pandas_udf(IntegerType(), PandasUDFType.SCALAR_ITER)
+        def iter_original_add(it):
+            for x, y in it:
+                yield x + y
+
+        self.assertEqual(iter_original_add.evalType, 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
+
+        for original_add in [scalar_original_add, iter_original_add]:
+            self.assertEqual(original_add.deterministic, True)
+            new_add = self.spark.catalog.registerFunction("add1", original_add)
+            res1 = df.select(new_add(col('a'), col('b')))
+            res2 = self.spark.sql(
+                "SELECT add1(t.a, t.b) FROM (SELECT id as a, id as b FROM 
range(10)) t")
+            expected = df.select(expr('a + b'))
+            self.assertEquals(expected.collect(), res1.collect())
+            self.assertEquals(expected.collect(), res2.collect())
+
+    def test_scalar_iter_udf_init(self):
+        import numpy as np
+
+        @pandas_udf('int', PandasUDFType.SCALAR_ITER)
+        def rng(batch_iter):
+            context = TaskContext.get()
+            part = context.partitionId()
+            np.random.seed(part)
+            for batch in batch_iter:
+                yield pd.Series(np.random.randint(100, size=len(batch)))
+        with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 
2}):
+            df = self.spark.range(10, 
numPartitions=2).select(rng(col("id").alias("v")))
+            result1 = df.collect()
+            result2 = df.collect()
+            self.assertEqual(result1, result2,
+                             "SCALAR ITER UDF can initialize state and produce 
deterministic RNG")
+
+    def test_scalar_iter_udf_close(self):
+        @pandas_udf('int', PandasUDFType.SCALAR_ITER)
+        def test_close(batch_iter):
+            try:
+                for batch in batch_iter:
+                    yield batch
+            finally:
+                raise RuntimeError("reached finally block")
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(Exception, "reached finally block"):
+                self.spark.range(1).select(test_close(col("id"))).collect()
 
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
@@ -616,9 +857,11 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
               datetime(2015, 11, 1, 1, 30),
               datetime(2015, 11, 1, 2, 30)]
         df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
-        foo_udf = pandas_udf(lambda x: x, 'timestamp')
-        result = df.withColumn('time', foo_udf(df.time))
-        self.assertEquals(df.collect(), result.collect())
+
+        for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
+            foo_udf = pandas_udf(lambda x: x, 'timestamp', udf_type)
+            result = df.withColumn('time', foo_udf(df.time))
+            self.assertEquals(df.collect(), result.collect())
 
     @unittest.skipIf(sys.version_info[:2] < (3, 5), "Type hints are supported 
from Python 3.5.")
     def test_type_annotation(self):
@@ -648,76 +891,39 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             return x + 1
 
         @pandas_udf('int')
-        def f2(x):
+        def f2_scalar(x):
             assert type(x) == pd.Series
             return x + 10
 
+        @pandas_udf('int', PandasUDFType.SCALAR_ITER)
+        def f2_iter(it):
+            for x in it:
+                assert type(x) == pd.Series
+                yield x + 10
+
         @udf('int')
         def f3(x):
             assert type(x) == int
             return x + 100
 
         @pandas_udf('int')
-        def f4(x):
+        def f4_scalar(x):
             assert type(x) == pd.Series
             return x + 1000
 
-        # Test single expression with chained UDFs
-        df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v'])))
-        df_chained_2 = df.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
-        df_chained_3 = df.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
-        df_chained_4 = df.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
-        df_chained_5 = df.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
-
-        expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11)
-        expected_chained_2 = df.withColumn('f3_f2_f1', df['v'] + 111)
-        expected_chained_3 = df.withColumn('f4_f3_f2_f1', df['v'] + 1111)
-        expected_chained_4 = df.withColumn('f4_f2_f1', df['v'] + 1011)
-        expected_chained_5 = df.withColumn('f4_f3_f1', df['v'] + 1101)
-
-        self.assertEquals(expected_chained_1.collect(), df_chained_1.collect())
-        self.assertEquals(expected_chained_2.collect(), df_chained_2.collect())
-        self.assertEquals(expected_chained_3.collect(), df_chained_3.collect())
-        self.assertEquals(expected_chained_4.collect(), df_chained_4.collect())
-        self.assertEquals(expected_chained_5.collect(), df_chained_5.collect())
-
-        # Test multiple mixed UDF expressions in a single projection
-        df_multi_1 = df \
-            .withColumn('f1', f1(col('v'))) \
-            .withColumn('f2', f2(col('v'))) \
-            .withColumn('f3', f3(col('v'))) \
-            .withColumn('f4', f4(col('v'))) \
-            .withColumn('f2_f1', f2(col('f1'))) \
-            .withColumn('f3_f1', f3(col('f1'))) \
-            .withColumn('f4_f1', f4(col('f1'))) \
-            .withColumn('f3_f2', f3(col('f2'))) \
-            .withColumn('f4_f2', f4(col('f2'))) \
-            .withColumn('f4_f3', f4(col('f3'))) \
-            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
-            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
-            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
-            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
-            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
-
-        # Test mixed udfs in a single expression
-        df_multi_2 = df \
-            .withColumn('f1', f1(col('v'))) \
-            .withColumn('f2', f2(col('v'))) \
-            .withColumn('f3', f3(col('v'))) \
-            .withColumn('f4', f4(col('v'))) \
-            .withColumn('f2_f1', f2(f1(col('v')))) \
-            .withColumn('f3_f1', f3(f1(col('v')))) \
-            .withColumn('f4_f1', f4(f1(col('v')))) \
-            .withColumn('f3_f2', f3(f2(col('v')))) \
-            .withColumn('f4_f2', f4(f2(col('v')))) \
-            .withColumn('f4_f3', f4(f3(col('v')))) \
-            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
-            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
-            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
-            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
-            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
-
-        expected = df \
+        @pandas_udf('int', PandasUDFType.SCALAR_ITER)
+        def f4_iter(it):
+            for x in it:
+                assert type(x) == pd.Series
+                yield x + 1000
+
+        expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11).collect()
+        expected_chained_2 = df.withColumn('f3_f2_f1', df['v'] + 111).collect()
+        expected_chained_3 = df.withColumn('f4_f3_f2_f1', df['v'] + 
1111).collect()
+        expected_chained_4 = df.withColumn('f4_f2_f1', df['v'] + 
1011).collect()
+        expected_chained_5 = df.withColumn('f4_f3_f1', df['v'] + 
1101).collect()
+
+        expected_multi = df \
             .withColumn('f1', df['v'] + 1) \
             .withColumn('f2', df['v'] + 10) \
             .withColumn('f3', df['v'] + 100) \
@@ -732,10 +938,62 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             .withColumn('f4_f2_f1', df['v'] + 1011) \
             .withColumn('f4_f3_f1', df['v'] + 1101) \
             .withColumn('f4_f3_f2', df['v'] + 1110) \
-            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
-
-        self.assertEquals(expected.collect(), df_multi_1.collect())
-        self.assertEquals(expected.collect(), df_multi_2.collect())
+            .withColumn('f4_f3_f2_f1', df['v'] + 1111) \
+            .collect()
+
+        for f2, f4 in [(f2_scalar, f4_scalar), (f2_scalar, f4_iter),
+                       (f2_iter, f4_scalar), (f2_iter, f4_iter)]:
+            # Test single expression with chained UDFs
+            df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v'])))
+            df_chained_2 = df.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
+            df_chained_3 = df.withColumn('f4_f3_f2_f1', 
f4(f3(f2(f1(df['v'])))))
+            df_chained_4 = df.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
+            df_chained_5 = df.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
+
+            self.assertEquals(expected_chained_1, df_chained_1.collect())
+            self.assertEquals(expected_chained_2, df_chained_2.collect())
+            self.assertEquals(expected_chained_3, df_chained_3.collect())
+            self.assertEquals(expected_chained_4, df_chained_4.collect())
+            self.assertEquals(expected_chained_5, df_chained_5.collect())
+
+            # Test multiple mixed UDF expressions in a single projection
+            df_multi_1 = df \
+                .withColumn('f1', f1(col('v'))) \
+                .withColumn('f2', f2(col('v'))) \
+                .withColumn('f3', f3(col('v'))) \
+                .withColumn('f4', f4(col('v'))) \
+                .withColumn('f2_f1', f2(col('f1'))) \
+                .withColumn('f3_f1', f3(col('f1'))) \
+                .withColumn('f4_f1', f4(col('f1'))) \
+                .withColumn('f3_f2', f3(col('f2'))) \
+                .withColumn('f4_f2', f4(col('f2'))) \
+                .withColumn('f4_f3', f4(col('f3'))) \
+                .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
+                .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
+                .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
+                .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
+                .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
+
+            # Test mixed udfs in a single expression
+            df_multi_2 = df \
+                .withColumn('f1', f1(col('v'))) \
+                .withColumn('f2', f2(col('v'))) \
+                .withColumn('f3', f3(col('v'))) \
+                .withColumn('f4', f4(col('v'))) \
+                .withColumn('f2_f1', f2(f1(col('v')))) \
+                .withColumn('f3_f1', f3(f1(col('v')))) \
+                .withColumn('f4_f1', f4(f1(col('v')))) \
+                .withColumn('f3_f2', f3(f2(col('v')))) \
+                .withColumn('f4_f2', f4(f2(col('v')))) \
+                .withColumn('f4_f3', f4(f3(col('v')))) \
+                .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
+                .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
+                .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
+                .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
+                .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
+
+            self.assertEquals(expected_multi, df_multi_1.collect())
+            self.assertEquals(expected_multi, df_multi_2.collect())
 
     def test_mixed_udf_and_sql(self):
         df = self.spark.range(0, 1).toDF('v')
@@ -752,25 +1010,15 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             return x + 10
 
         @pandas_udf('int')
-        def f3(x):
+        def f3s(x):
             assert type(x) == pd.Series
             return x + 100
 
-        df1 = df.withColumn('f1', f1(df['v'])) \
-            .withColumn('f2', f2(df['v'])) \
-            .withColumn('f3', f3(df['v'])) \
-            .withColumn('f1_f2', f1(f2(df['v']))) \
-            .withColumn('f1_f3', f1(f3(df['v']))) \
-            .withColumn('f2_f1', f2(f1(df['v']))) \
-            .withColumn('f2_f3', f2(f3(df['v']))) \
-            .withColumn('f3_f1', f3(f1(df['v']))) \
-            .withColumn('f3_f2', f3(f2(df['v']))) \
-            .withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) \
-            .withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) \
-            .withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) \
-            .withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) \
-            .withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) \
-            .withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
+        @pandas_udf('int', PandasUDFType.SCALAR_ITER)
+        def f3i(it):
+            for x in it:
+                assert type(x) == pd.Series
+                yield x + 100
 
         expected = df.withColumn('f1', df['v'] + 1) \
             .withColumn('f2', df['v'] + 10) \
@@ -786,9 +1034,27 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             .withColumn('f2_f1_f3', df['v'] + 111) \
             .withColumn('f2_f3_f1', df['v'] + 111) \
             .withColumn('f3_f1_f2', df['v'] + 111) \
-            .withColumn('f3_f2_f1', df['v'] + 111)
-
-        self.assertEquals(expected.collect(), df1.collect())
+            .withColumn('f3_f2_f1', df['v'] + 111) \
+            .collect()
+
+        for f3 in [f3s, f3i]:
+            df1 = df.withColumn('f1', f1(df['v'])) \
+                .withColumn('f2', f2(df['v'])) \
+                .withColumn('f3', f3(df['v'])) \
+                .withColumn('f1_f2', f1(f2(df['v']))) \
+                .withColumn('f1_f3', f1(f3(df['v']))) \
+                .withColumn('f2_f1', f2(f1(df['v']))) \
+                .withColumn('f2_f3', f2(f3(df['v']))) \
+                .withColumn('f3_f1', f3(f1(df['v']))) \
+                .withColumn('f3_f2', f3(f2(df['v']))) \
+                .withColumn('f1_f2_f3', f1(f2(f3(df['v'])))) \
+                .withColumn('f1_f3_f2', f1(f3(f2(df['v'])))) \
+                .withColumn('f2_f1_f3', f2(f1(f3(df['v'])))) \
+                .withColumn('f2_f3_f1', f2(f3(f1(df['v'])))) \
+                .withColumn('f3_f1_f2', f3(f1(f2(df['v'])))) \
+                .withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
+
+            self.assertEquals(expected, df1.collect())
 
     # SPARK-24721
     @unittest.skipIf(not test_compiled, test_not_compiled_message)
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 275abe9..84be2d2 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -40,6 +40,7 @@ def _wrap_function(sc, func, returnType):
 def _create_udf(f, returnType, evalType):
 
     if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                    PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
                     PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
                     PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
 
@@ -48,7 +49,9 @@ def _create_udf(f, returnType, evalType):
 
         argspec = _get_argspec(f)
 
-        if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and 
len(argspec.args) == 0 and \
+        if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
+                evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) and \
+                len(argspec.args) == 0 and \
                 argspec.varargs is None:
             raise ValueError(
                 "Invalid function: 0-arg pandas_udfs are not supported. "
@@ -113,7 +116,8 @@ class UserDefinedFunction(object):
             else:
                 self._returnType_placeholder = 
_parse_datatype_string(self._returnType)
 
-        if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+        if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF or \
+                self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
             try:
                 to_arrow_type(self._returnType_placeholder)
             except TypeError:
@@ -323,10 +327,11 @@ class UDFRegistration(object):
                     "a user-defined function, but got %s." % returnType)
             if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
                                   PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                                  PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
                                   PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
                 raise ValueError(
-                    "Invalid f: f must be SQL_BATCHED_UDF, 
SQL_SCALAR_PANDAS_UDF or "
-                    "SQL_GROUPED_AGG_PANDAS_UDF")
+                    "Invalid f: f must be SQL_BATCHED_UDF, 
SQL_SCALAR_PANDAS_UDF, "
+                    "SQL_SCALAR_PANDAS_ITER_UDF, or 
SQL_GROUPED_AGG_PANDAS_UDF")
             register_udf = UserDefinedFunction(f.func, 
returnType=f.returnType, name=name,
                                                evalType=f.evalType,
                                                deterministic=f.deterministic)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 16257be..ee46bb6 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -86,21 +86,29 @@ def wrap_udf(f, return_type):
         return lambda *a: f(*a)
 
 
-def wrap_scalar_pandas_udf(f, return_type):
+def wrap_scalar_pandas_udf(f, return_type, eval_type):
     arrow_return_type = to_arrow_type(return_type)
 
-    def verify_result_length(*a):
-        result = f(*a)
+    def verify_result_type(result):
         if not hasattr(result, "__len__"):
             pd_type = "Pandas.DataFrame" if type(return_type) == StructType 
else "Pandas.Series"
             raise TypeError("Return type of the user-defined function should 
be "
                             "{}, but is {}".format(pd_type, type(result)))
-        if len(result) != len(a[0]):
+        return result
+
+    def verify_result_length(result, length):
+        if len(result) != length:
             raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-                               "expected %d, got %d" % (len(a[0]), 
len(result)))
+                               "expected %d, got %d" % (length, len(result)))
         return result
 
-    return lambda *a: (verify_result_length(*a), arrow_return_type)
+    if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+        return lambda *a: (verify_result_length(
+            verify_result_type(f(*a)), len(a[0])), arrow_return_type)
+    else:
+        # The result length verification is done at the end of a partition.
+        return lambda *iterator: map(lambda res: (res, arrow_return_type),
+                                     map(verify_result_type, f(*iterator)))
 
 
 def wrap_grouped_map_pandas_udf(f, return_type, argspec):
@@ -201,23 +209,28 @@ def wrap_bounded_window_agg_pandas_udf(f, return_type):
 def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
     num_arg = read_int(infile)
     arg_offsets = [read_int(infile) for i in range(num_arg)]
-    row_func = None
+    chained_func = None
     for i in range(read_int(infile)):
         f, return_type = read_command(pickleSer, infile)
-        if row_func is None:
-            row_func = f
+        if chained_func is None:
+            chained_func = f
         else:
-            row_func = chain(row_func, f)
+            chained_func = chain(chained_func, f)
 
-    # make sure StopIteration's raised in the user code are not ignored
-    # when they are processed in a for loop, raise them as RuntimeError's 
instead
-    func = fail_on_stopiteration(row_func)
+    if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+        func = chained_func
+    else:
+        # make sure StopIteration's raised in the user code are not ignored
+        # when they are processed in a for loop, raise them as RuntimeError's 
instead
+        func = fail_on_stopiteration(chained_func)
 
     # the last returnType will be the return type of UDF
     if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-        return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
+        return arg_offsets, wrap_scalar_pandas_udf(func, return_type, 
eval_type)
+    elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+        return arg_offsets, wrap_scalar_pandas_udf(func, return_type, 
eval_type)
     elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
-        argspec = _get_argspec(row_func)  # signature was lost when wrapping it
+        argspec = _get_argspec(chained_func)  # signature was lost when 
wrapping it
         return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, 
argspec)
     elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
         return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
@@ -233,6 +246,7 @@ def read_udfs(pickleSer, infile, eval_type):
     runner_conf = {}
 
     if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                     PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
                      PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
                      PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
                      PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):
@@ -255,13 +269,60 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # Scalar Pandas UDF handles struct type arguments as pandas DataFrames 
instead of
         # pandas Series. See SPARK-27240.
-        df_for_struct = eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF
+        df_for_struct = (eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF or
+                         eval_type == 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF)
         ser = ArrowStreamPandasUDFSerializer(timezone, safecheck, 
assign_cols_by_name,
                                              df_for_struct)
     else:
         ser = BatchedSerializer(PickleSerializer(), 100)
 
     num_udfs = read_int(infile)
+
+    if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+        assert num_udfs == 1, "One SQL_SCALAR_PANDAS_ITER_UDF expected here."
+
+        arg_offsets, udf = read_single_udf(
+            pickleSer, infile, eval_type, runner_conf, udf_index=0)
+
+        def func(_, iterator):
+            num_input_rows = [0]
+
+            def map_batch(batch):
+                udf_args = [batch[offset] for offset in arg_offsets]
+                num_input_rows[0] += len(udf_args[0])
+                if len(udf_args) == 1:
+                    return udf_args[0]
+                else:
+                    return tuple(udf_args)
+
+            iterator = map(map_batch, iterator)
+            result_iter = udf(iterator)
+
+            num_output_rows = 0
+            for result_batch, result_type in result_iter:
+                num_output_rows += len(result_batch)
+                assert num_output_rows <= num_input_rows[0], \
+                    "Pandas SCALAR_ITER UDF outputted more rows than input 
rows."
+                yield (result_batch, result_type)
+            try:
+                if sys.version >= '3':
+                    iterator.__next__()
+                else:
+                    iterator.next()
+            except StopIteration:
+                pass
+            else:
+                raise RuntimeError("SQL_SCALAR_PANDAS_ITER_UDF should exhaust 
the input iterator.")
+
+            if num_output_rows != num_input_rows[0]:
+                raise RuntimeError("The number of output rows of pandas 
iterator UDF should be "
+                                   "the same with input rows. The input rows 
number is %d but the "
+                                   "output rows number is %d." %
+                                   (num_input_rows[0], num_output_rows))
+
+        # profiling is not supported for UDF
+        return func, None, ser, ser
+
     udfs = {}
     call_udf = []
     mapper_str = ""
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index 2d82355..690969e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.types.DataType
 object PythonUDF {
   private[this] val SCALAR_TYPES = Set(
     PythonEvalType.SQL_BATCHED_UDF,
-    PythonEvalType.SQL_SCALAR_PANDAS_UDF
+    PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+    PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
   )
 
   def isScalarPythonUDF(e: Expression): Boolean = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index 2df30a1..d87bbf7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -64,4 +64,5 @@ case class BatchEvalPython(
 case class ArrowEvalPython(
     udfs: Seq[PythonUDF],
     resultAttrs: Seq[Attribute],
-    child: LogicalPlan) extends BaseEvalPython
+    child: LogicalPlan,
+    evalType: Int) extends BaseEvalPython
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index faf2fdd..0c4775e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -623,8 +623,8 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    */
   object PythonEvals extends Strategy {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case ArrowEvalPython(udfs, output, child) =>
-        ArrowEvalPythonExec(udfs, output, planLater(child)) :: Nil
+      case ArrowEvalPython(udfs, output, child, evalType) =>
+        ArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil
       case BatchEvalPython(udfs, output, child) =>
         BatchEvalPythonExec(udfs, output, planLater(child)) :: Nil
       case _ =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 73a43af..e714554 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -59,7 +59,8 @@ private[spark] class BatchIterator[T](iter: Iterator[T], 
batchSize: Int)
 /**
  * A physical plan that evaluates a [[PythonUDF]].
  */
-case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: 
Seq[Attribute], child: SparkPlan)
+case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: 
Seq[Attribute], child: SparkPlan,
+    evalType: Int)
   extends EvalPythonExec(udfs, resultAttrs, child) {
 
   private val batchSize = conf.arrowMaxRecordsPerBatch
@@ -80,7 +81,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
 
     val columnarBatchIter = new ArrowPythonRunner(
       funcs,
-      PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+      evalType,
       argOffsets,
       schema,
       sessionLocalTimeZone,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 7f59d74..58fe7d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -111,19 +111,27 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
PredicateHelper {
   }
 
   private def collectEvaluableUDFsFromExpressions(expressions: 
Seq[Expression]): Seq[PythonUDF] = {
-    // Eval type checker is set once when we find the first evaluable UDF and 
its value
-    // shouldn't change later.
-    // Used to check if subsequent UDFs are of the same type as the first UDF. 
(since we can only
+    // If fisrt UDF is SQL_SCALAR_PANDAS_ITER_UDF, then only return this UDF,
+    // otherwise check if subsequent UDFs are of the same type as the first 
UDF. (since we can only
     // extract UDFs of the same eval type)
-    var evalTypeChecker: Option[EvalTypeChecker] = None
+
+    var firstVisitedScalarUDFEvalType: Option[Int] = None
+
+    def canChainUDF(evalType: Int): Boolean = {
+      if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF) {
+        false
+      } else {
+        evalType == firstVisitedScalarUDFEvalType.get
+      }
+    }
 
     def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
       case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf)
-        && evalTypeChecker.isEmpty =>
-        evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == 
udf.evalType)
+        && firstVisitedScalarUDFEvalType.isEmpty =>
+        firstVisitedScalarUDFEvalType = Some(udf.evalType)
         Seq(udf)
       case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf)
-        && evalTypeChecker.get(udf.evalType) =>
+        && canChainUDF(udf.evalType) =>
         Seq(udf)
       case e => e.children.flatMap(collectEvaluableUDFs)
     }
@@ -175,16 +183,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
PredicateHelper {
             AttributeReference(s"pythonUDF$i", u.dataType)()
           }
 
-          val evaluation = validUdfs.partition(
-            _.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
-          ) match {
-            case (vectorizedUdfs, plainUdfs) if plainUdfs.isEmpty =>
-              ArrowEvalPython(vectorizedUdfs, resultAttrs, child)
-            case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
-              BatchEvalPython(plainUdfs, resultAttrs, child)
+          val evalTypes = validUdfs.map(_.evalType).toSet
+          if (evalTypes.size != 1) {
+            throw new AnalysisException(
+              s"Expected udfs have the same evalType but got different 
evalTypes: " +
+              s"${evalTypes.mkString(",")}")
+          }
+          val evalType = evalTypes.head
+          val evaluation = evalType match {
+            case PythonEvalType.SQL_BATCHED_UDF =>
+              BatchEvalPython(validUdfs, resultAttrs, child)
+            case PythonEvalType.SQL_SCALAR_PANDAS_UDF | 
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF =>
+              ArrowEvalPython(validUdfs, resultAttrs, child, evalType)
             case _ =>
-              throw new AnalysisException(
-                "Expected either Scalar Pandas UDFs or Batched UDFs but got 
both")
+              throw new AnalysisException("Unexcepted UDF evalType")
           }
 
           attributeMap ++= validUdfs.zip(resultAttrs)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to