[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22104 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212794910 --- Diff: python/pyspark/sql/utils.py --- @@ -152,6 +152,22 @@ def require_minimum_pyarrow_version(): "your version was %s." % (minimum_pyarrow_version, pyarrow.__version__)) +def require_test_compiled(): +""" Raise Exception if test classes are not compiled +""" +import os +try: +spark_home = os.environ['SPARK_HOME'] +except KeyError: +raise RuntimeError('SPARK_HOME is not defined in environment') + +test_class_path = os.path.join( +spark_home, 'sql', 'core', 'target', 'scala-2.11', 'test-classes') --- End diff -- Eh, @icexelloss, can we avoid specific version of `scala-2.11` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212460124 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,35 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +import pandas as pd +import numpy as np +from pyspark.sql.functions import udf, pandas_udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) --- End diff -- Created separate tests for pandas_udf under ScalarPandasUDFTests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212396812 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- Added checks to skip the tests if scala tests are not compiled --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212347966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Created https://jira.apache.org/jira/browse/SPARK-25213 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212340459 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- @cloud-fan Sorry, I don't think I am being very clear... > If the data source does not produce UnsafeRow, Spark will make sure there will be a project > above it to produce UnsafeRow I don't think this is happening for datasource V2 right now: (Code running in pyspark test) ``` datasource_v2_df = self.spark.read \ .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ .load() result = datasource_v2_df.withColumn('x', udf(lambda x: x, 'int')(datasource_v2_df['i'])) result.show() ``` The code above fails with: ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:127) at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1$$anonfun$5.apply(EvalPythonExec.scala:126) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074) ``` I think this is an issue with DataSourceV2 that probably should be addressed in another PR (DataSourceV1 works fine). @cloud-fan WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212309541 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Thanks! I will remove this then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r212197529 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Ideally all the operators will produce UnsafeRow. If the data source does not produce UnsafeRow, Spark will make sure there will be a project above it to produce UnsafeRow, so we don't need to worry it here and safely assume the input is always UnsafeRow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r211733007 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- Friendly ping @cloud-fan. Do you think forcing a unsafeProject here to deal with non-unsafe rows from data sources are correct? Is there a way to know whether the children nodes output unsafe rows so to avoid unnecessary unsafe projection here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210996331 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- Hmm... I think this is a bit fragile because things like "scala-2.11" (scala version can change). Seems a bit over complicated to do this properly, when do we expect pyspark test to run without compiling scala test classes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210955687 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- I can probably check try to check the existence of sql/core/target/scala-2.11/test-classes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210954447 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- @HyukjinKwon I actually am not sure how does pyspark find these classes and how to check the existence, do you have an example? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210786895 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,33 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +from pyspark.sql.functions import udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) +datasource_df = self.spark.read \ +.format("org.apache.spark.sql.sources.SimpleScanSource") \ +.option('from', 0).option('to', 1).load() +datasource_v2_df = self.spark.read \ + .format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2") \ --- End diff -- This wouldn't work if test classes are not compiled. I think we should better make another test suite that skips the test if the test classes are not existent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210414941 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + + // Project input rows to unsafe row so we can put it in the row queue + val unsafeProjection = UnsafeProjection.create(child.output, child.output) --- End diff -- This requires some discussion. This is probably another bug I found in testing this - If the input node to EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. I don't know if we require data sources to produce unsafe rows, if not, then this is a problem. I also don't know if this will introduce additional copy if input is already UnsafeRow - it seems like UnsafeProject should be smart to skip the copy but I am not sure if it's actually the case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210410738 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + val projection = UnsafeProjection.create(allInputs, child.output) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }) // Add rows to queue to join later with the result. val projectedRowIter = iter.map { inputRow => -queue.add(inputRow.asInstanceOf[UnsafeRow]) -projection(inputRow) +val unsafeRow = projection(inputRow) +queue.add(unsafeRow.asInstanceOf[UnsafeRow]) --- End diff -- Ok.. This seems to break existing tests. Need to look into it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210391237 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,35 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +# SPARK-24721 +def test_datasource_with_udf_filter_lit_input(self): +import pandas as pd +import numpy as np +from pyspark.sql.functions import udf, pandas_udf, lit, col + +path = tempfile.mkdtemp() +shutil.rmtree(path) +try: + self.spark.range(1).write.mode("overwrite").format('csv').save(path) +filesource_df = self.spark.read.csv(path) --- End diff -- @gatorsmile Added tests for file source, data source and data source v2. I might need to move the pandas_udf tests into another tests because of arrow_requirement :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210390770 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -133,6 +134,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { } def apply(plan: SparkPlan): SparkPlan = plan transformUp { +// SPARK-24721: Ignore Python UDFs in DataSourceScan and DataSourceV2Scan +case plan: DataSourceScanExec => plan --- End diff -- I get rid of the logic previously in `FileSourceStrategy` to exclude PythonUDF in the filter in favor of this fix - I think this fix is cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210390399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala --- @@ -117,15 +117,16 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } }.toArray }.toArray - val projection = newMutableProjection(allInputs, child.output) + val projection = UnsafeProjection.create(allInputs, child.output) val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => StructField(s"_$i", dt) }) // Add rows to queue to join later with the result. val projectedRowIter = iter.map { inputRow => -queue.add(inputRow.asInstanceOf[UnsafeRow]) -projection(inputRow) +val unsafeRow = projection(inputRow) +queue.add(unsafeRow.asInstanceOf[UnsafeRow]) --- End diff -- This is probably another bug I found in testing this - If the input node to EvalPythonExec doesn't produce UnsafeRow, and cast here will fail. I found this in testing when I pass in an test data source scan node, which produces GeneralInternalRow, will throw exception here. I am happy to submit this as a separate patch if people think it's necessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210052093 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,24 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +def test_datasource_with_udf_filter_lit_input(self): --- End diff -- Make sense. Will add. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/22104#discussion_r210044089 --- Diff: python/pyspark/sql/tests.py --- @@ -3367,6 +3367,24 @@ def test_ignore_column_of_all_nulls(self): finally: shutil.rmtree(path) +def test_datasource_with_udf_filter_lit_input(self): --- End diff -- Add another test case for arrow-based pandas udf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22104: [SPARK-24721][SQL] Exclude Python UDFs filters in...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/22104 [SPARK-24721][SQL] Exclude Python UDFs filters in FileSourceStrategy ## What changes were proposed in this pull request? The PR excludes Python UDFs filters in FileSourceStrategy so that they don't ExtractPythonUDF rule to throw exception. It doesn't make sense to pass Python UDF filters in FileSourceStrategy anyway because they cannot be used as push down filters. ## How was this patch tested? Add a new regression test You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-24721-udf-filter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22104 commit 512f4b64cb7662baa23995c6f6c109a735ec8f5e Author: Li Jin Date: 2018-08-14T14:22:50Z Fix file strategy to exclude python UDF filters --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org