[spark] branch branch-2.4 updated: Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…"

2020-11-05 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new c342bcd  Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use 
ContextAwareIterator to stop consumin…"
c342bcd is described below

commit c342bcd4c4ba68506ca6b459bd3a9c688d2aecfa
Author: HyukjinKwon 
AuthorDate: Thu Nov 5 16:16:07 2020 +0900

Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop 
consumin…"

This reverts commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459.
---
 python/pyspark/sql/tests.py| 42 --
 .../sql/execution/python/EvalPythonExec.scala  | 18 +-
 2 files changed, 1 insertion(+), 59 deletions(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8a25311..b995227 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3628,26 +3628,6 @@ class SQLTests(ReusedSQLTestCase):
 finally:
 self.spark.catalog.dropTempView("v")
 
-# SPARK-33277
-def test_udf_with_column_vector(self):
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 10, 1, 1).write.parquet(path)
-
-def f(x):
-return 0
-
-fUdf = udf(f, LongType())
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -5595,28 +5575,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
-# SPARK-33277
-def test_pandas_udf_with_column_vector(self):
-import pandas as pd
-from pyspark.sql.functions import pandas_udf
-
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 20, 1, 1).write.parquet(path)
-
-@pandas_udf(LongType())
-def udf(x):
-return pd.Series([0] * len(x))
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 293a7c0..942a6db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
 
 inputRDD.mapPartitions { iter =>
   val context = TaskContext.get()
-  val contextAwareIterator = new ContextAwareIterator(iter, context)
 
   // The queue used to buffer input rows so we can drain it to
   // combine input with output from Python.
@@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
   })
 
   // Add rows to queue to join later with the result.
-  val projectedRowIter = contextAwareIterator.map { inputRow =>
+  val projectedRowIter = iter.map { inputRow =>
 queue.add(inputRow.asInstanceOf[UnsafeRow])
 projection(inputRow)
   }
@@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
 }
   }
 }
-
-/**
- * A TaskContext aware iterator.
- *
- * As the Python evaluation consumes the parent iterator in a separate thread,
- * it could consume more data from the parent even after the task ends and the 
parent is closed.
- * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
- */
-class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
-
-  override def hasNext: Boolean =
-!context.isCompleted() && !context.isInterrupted() && iter.hasNext
-
-  override def next(): IN = iter.next()
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…"

2020-11-05 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new c342bcd  Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use 
ContextAwareIterator to stop consumin…"
c342bcd is described below

commit c342bcd4c4ba68506ca6b459bd3a9c688d2aecfa
Author: HyukjinKwon 
AuthorDate: Thu Nov 5 16:16:07 2020 +0900

Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop 
consumin…"

This reverts commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459.
---
 python/pyspark/sql/tests.py| 42 --
 .../sql/execution/python/EvalPythonExec.scala  | 18 +-
 2 files changed, 1 insertion(+), 59 deletions(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8a25311..b995227 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3628,26 +3628,6 @@ class SQLTests(ReusedSQLTestCase):
 finally:
 self.spark.catalog.dropTempView("v")
 
-# SPARK-33277
-def test_udf_with_column_vector(self):
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 10, 1, 1).write.parquet(path)
-
-def f(x):
-return 0
-
-fUdf = udf(f, LongType())
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -5595,28 +5575,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
-# SPARK-33277
-def test_pandas_udf_with_column_vector(self):
-import pandas as pd
-from pyspark.sql.functions import pandas_udf
-
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 20, 1, 1).write.parquet(path)
-
-@pandas_udf(LongType())
-def udf(x):
-return pd.Series([0] * len(x))
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 293a7c0..942a6db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
 
 inputRDD.mapPartitions { iter =>
   val context = TaskContext.get()
-  val contextAwareIterator = new ContextAwareIterator(iter, context)
 
   // The queue used to buffer input rows so we can drain it to
   // combine input with output from Python.
@@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
   })
 
   // Add rows to queue to join later with the result.
-  val projectedRowIter = contextAwareIterator.map { inputRow =>
+  val projectedRowIter = iter.map { inputRow =>
 queue.add(inputRow.asInstanceOf[UnsafeRow])
 projection(inputRow)
   }
@@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
 }
   }
 }
-
-/**
- * A TaskContext aware iterator.
- *
- * As the Python evaluation consumes the parent iterator in a separate thread,
- * it could consume more data from the parent even after the task ends and the 
parent is closed.
- * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
- */
-class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
-
-  override def hasNext: Boolean =
-!context.isCompleted() && !context.isInterrupted() && iter.hasNext
-
-  override def next(): IN = iter.next()
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…"

2020-11-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new c342bcd  Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use 
ContextAwareIterator to stop consumin…"
c342bcd is described below

commit c342bcd4c4ba68506ca6b459bd3a9c688d2aecfa
Author: HyukjinKwon 
AuthorDate: Thu Nov 5 16:16:07 2020 +0900

Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop 
consumin…"

This reverts commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459.
---
 python/pyspark/sql/tests.py| 42 --
 .../sql/execution/python/EvalPythonExec.scala  | 18 +-
 2 files changed, 1 insertion(+), 59 deletions(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8a25311..b995227 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3628,26 +3628,6 @@ class SQLTests(ReusedSQLTestCase):
 finally:
 self.spark.catalog.dropTempView("v")
 
-# SPARK-33277
-def test_udf_with_column_vector(self):
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 10, 1, 1).write.parquet(path)
-
-def f(x):
-return 0
-
-fUdf = udf(f, LongType())
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -5595,28 +5575,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
-# SPARK-33277
-def test_pandas_udf_with_column_vector(self):
-import pandas as pd
-from pyspark.sql.functions import pandas_udf
-
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 20, 1, 1).write.parquet(path)
-
-@pandas_udf(LongType())
-def udf(x):
-return pd.Series([0] * len(x))
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 293a7c0..942a6db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
 
 inputRDD.mapPartitions { iter =>
   val context = TaskContext.get()
-  val contextAwareIterator = new ContextAwareIterator(iter, context)
 
   // The queue used to buffer input rows so we can drain it to
   // combine input with output from Python.
@@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
   })
 
   // Add rows to queue to join later with the result.
-  val projectedRowIter = contextAwareIterator.map { inputRow =>
+  val projectedRowIter = iter.map { inputRow =>
 queue.add(inputRow.asInstanceOf[UnsafeRow])
 projection(inputRow)
   }
@@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
 }
   }
 }
-
-/**
- * A TaskContext aware iterator.
- *
- * As the Python evaluation consumes the parent iterator in a separate thread,
- * it could consume more data from the parent even after the task ends and the 
parent is closed.
- * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
- */
-class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
-
-  override def hasNext: Boolean =
-!context.isCompleted() && !context.isInterrupted() && iter.hasNext
-
-  override def next(): IN = iter.next()
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…"

2020-11-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new c342bcd  Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use 
ContextAwareIterator to stop consumin…"
c342bcd is described below

commit c342bcd4c4ba68506ca6b459bd3a9c688d2aecfa
Author: HyukjinKwon 
AuthorDate: Thu Nov 5 16:16:07 2020 +0900

Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop 
consumin…"

This reverts commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459.
---
 python/pyspark/sql/tests.py| 42 --
 .../sql/execution/python/EvalPythonExec.scala  | 18 +-
 2 files changed, 1 insertion(+), 59 deletions(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8a25311..b995227 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3628,26 +3628,6 @@ class SQLTests(ReusedSQLTestCase):
 finally:
 self.spark.catalog.dropTempView("v")
 
-# SPARK-33277
-def test_udf_with_column_vector(self):
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 10, 1, 1).write.parquet(path)
-
-def f(x):
-return 0
-
-fUdf = udf(f, LongType())
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -5595,28 +5575,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
-# SPARK-33277
-def test_pandas_udf_with_column_vector(self):
-import pandas as pd
-from pyspark.sql.functions import pandas_udf
-
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 20, 1, 1).write.parquet(path)
-
-@pandas_udf(LongType())
-def udf(x):
-return pd.Series([0] * len(x))
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 293a7c0..942a6db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
 
 inputRDD.mapPartitions { iter =>
   val context = TaskContext.get()
-  val contextAwareIterator = new ContextAwareIterator(iter, context)
 
   // The queue used to buffer input rows so we can drain it to
   // combine input with output from Python.
@@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
   })
 
   // Add rows to queue to join later with the result.
-  val projectedRowIter = contextAwareIterator.map { inputRow =>
+  val projectedRowIter = iter.map { inputRow =>
 queue.add(inputRow.asInstanceOf[UnsafeRow])
 projection(inputRow)
   }
@@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
 }
   }
 }
-
-/**
- * A TaskContext aware iterator.
- *
- * As the Python evaluation consumes the parent iterator in a separate thread,
- * it could consume more data from the parent even after the task ends and the 
parent is closed.
- * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
- */
-class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
-
-  override def hasNext: Boolean =
-!context.isCompleted() && !context.isInterrupted() && iter.hasNext
-
-  override def next(): IN = iter.next()
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…"

2020-11-04 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new c342bcd  Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use 
ContextAwareIterator to stop consumin…"
c342bcd is described below

commit c342bcd4c4ba68506ca6b459bd3a9c688d2aecfa
Author: HyukjinKwon 
AuthorDate: Thu Nov 5 16:16:07 2020 +0900

Revert "[SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop 
consumin…"

This reverts commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459.
---
 python/pyspark/sql/tests.py| 42 --
 .../sql/execution/python/EvalPythonExec.scala  | 18 +-
 2 files changed, 1 insertion(+), 59 deletions(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8a25311..b995227 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3628,26 +3628,6 @@ class SQLTests(ReusedSQLTestCase):
 finally:
 self.spark.catalog.dropTempView("v")
 
-# SPARK-33277
-def test_udf_with_column_vector(self):
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 10, 1, 1).write.parquet(path)
-
-def f(x):
-return 0
-
-fUdf = udf(f, LongType())
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -5595,28 +5575,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
 finally:
 shutil.rmtree(path)
 
-# SPARK-33277
-def test_pandas_udf_with_column_vector(self):
-import pandas as pd
-from pyspark.sql.functions import pandas_udf
-
-path = tempfile.mkdtemp()
-shutil.rmtree(path)
-
-try:
-self.spark.range(0, 20, 1, 1).write.parquet(path)
-
-@pandas_udf(LongType())
-def udf(x):
-return pd.Series([0] * len(x))
-
-for offheap in ["true", "false"]:
-with self.sql_conf({"spark.sql.columnVector.offheap.enabled": 
offheap}):
-self.assertEquals(
-
self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
-finally:
-shutil.rmtree(path)
-
 
 @unittest.skipIf(
 not _have_pandas or not _have_pyarrow,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 293a7c0..942a6db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
 
 inputRDD.mapPartitions { iter =>
   val context = TaskContext.get()
-  val contextAwareIterator = new ContextAwareIterator(iter, context)
 
   // The queue used to buffer input rows so we can drain it to
   // combine input with output from Python.
@@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], chil
   })
 
   // Add rows to queue to join later with the result.
-  val projectedRowIter = contextAwareIterator.map { inputRow =>
+  val projectedRowIter = iter.map { inputRow =>
 queue.add(inputRow.asInstanceOf[UnsafeRow])
 projection(inputRow)
   }
@@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chil
 }
   }
 }
-
-/**
- * A TaskContext aware iterator.
- *
- * As the Python evaluation consumes the parent iterator in a separate thread,
- * it could consume more data from the parent even after the task ends and the 
parent is closed.
- * Thus, we should use ContextAwareIterator to stop consuming after the task 
ends.
- */
-class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) 
extends Iterator[IN] {
-
-  override def hasNext: Boolean =
-!context.isCompleted() && !context.isInterrupted() && iter.hasNext
-
-  override def next(): IN = iter.next()
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org