[jira] [Created] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udfs
Michael Tong created SPARK-27691: Summary: Issue when running queries using filter predicates on pandas GROUPED_AGG udfs Key: SPARK-27691 URL: https://issues.apache.org/jira/browse/SPARK-27691 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.4.2 Reporter: Michael Tong Am currently running pyspark 2.4.2 and I am currently unable to run the following code. {code:java} from pyspark.sql import functions, types import pandas as pd import random # initialize test data test_data = [[False, int(random.random() * 2)] for i in range(1)] test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value']) # pandas udf pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), functions.PandasUDFType.GROUPED_AGG) # create spark DataFrame and build the query test_df = spark.createDataFrame(test_data) test_df = test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result')) test_df = test_df.filter(functions.col('bool_any_result') == True) # write to output test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite') {code} Below is a truncated error message. {code:java} Py4JJavaError: An error occurred while calling o1125.parquet. : org.apache.spark.SparkException: Job aborted. ... Exchange hashpartitioning(int_value#123L, 2000) +- *(1) Filter ((bool_value#122) = true) +- Scan ExistingRDD arrow[bool_value#122,int_value#123L] ... Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: (input[0, boolean, true]){code} What appears to be happening is that the query optimizer incorrectly pushes up the filter predicate on bool_any_result before the group by operation. This causes the query to error out before spark attempts to execute the query. I have also tried running a variant of this query with functions.count() as the aggregation function and the query ran fine, so I believe that this is an error that only affects pandas udfs. Variant of query with standard aggregation function {code:java} test_df = spark.createDataFrame(test_data) test_df = test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts')) test_df = test_df.filter(functions.col('bool_counts') > 0) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udafs
[ https://issues.apache.org/jira/browse/SPARK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Tong updated SPARK-27691: - Summary: Issue when running queries using filter predicates on pandas GROUPED_AGG udafs (was: Issue when running queries using filter predicates on pandas GROUPED_AGG udfs) > Issue when running queries using filter predicates on pandas GROUPED_AGG udafs > -- > > Key: SPARK-27691 > URL: https://issues.apache.org/jira/browse/SPARK-27691 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.2 >Reporter: Michael Tong >Priority: Major > > Am currently running pyspark 2.4.2 and I am currently unable to run the > following code. > > {code:java} > from pyspark.sql import functions, types > import pandas as pd > import random > # initialize test data > test_data = [[False, int(random.random() * 2)] for i in range(1)] > test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value']) > # pandas udf > pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), > functions.PandasUDFType.GROUPED_AGG) > # create spark DataFrame and build the query > test_df = spark.createDataFrame(test_data) > test_df = > test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result')) > test_df = test_df.filter(functions.col('bool_any_result') == True) > # write to output > test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite') > {code} > > Below is a truncated error message. > > {code:java} > Py4JJavaError: An error occurred while calling o1125.parquet. : > org.apache.spark.SparkException: Job aborted. > ... > Exchange hashpartitioning(int_value#123L, 2000) > +- *(1) Filter ((bool_value#122) = true) >+- Scan ExistingRDD arrow[bool_value#122,int_value#123L] > ... > Caused by: java.lang.UnsupportedOperationException: Cannot evaluate > expression: (input[0, boolean, true]){code} > > > What appears to be happening is that the query optimizer incorrectly pushes > up the filter predicate on bool_any_result before the group by operation. > This causes the query to error out before spark attempts to execute the > query. I have also tried running a variant of this query with > functions.count() as the aggregation function and the query ran fine, so I > believe that this is an error that only affects pandas udfs. > > Variant of query with standard aggregation function > {code:java} > test_df = spark.createDataFrame(test_data) > test_df = > test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts')) > test_df = test_df.filter(functions.col('bool_counts') > 0) > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udafs
[ https://issues.apache.org/jira/browse/SPARK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Tong updated SPARK-27691: - Description: Am currently running pyspark 2.4.2 and I am currently unable to run the following code. {code:java} from pyspark.sql import functions, types import pandas as pd import random # initialize test data test_data = [[False, int(random.random() * 2)] for i in range(1)] test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value']) # pandas udaf pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), functions.PandasUDFType.GROUPED_AGG) # create spark DataFrame and build the query test_df = spark.createDataFrame(test_data) test_df = test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result')) test_df = test_df.filter(functions.col('bool_any_result') == True) # write to output test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite') {code} Below is a truncated error message. {code:java} Py4JJavaError: An error occurred while calling o1125.parquet. : org.apache.spark.SparkException: Job aborted. ... Exchange hashpartitioning(int_value#123L, 2000) +- *(1) Filter ((bool_value#122) = true) +- Scan ExistingRDD arrow[bool_value#122,int_value#123L] ... Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: (input[0, boolean, true]){code} What appears to be happening is that the query optimizer incorrectly pushes up the filter predicate on bool_any_result before the group by operation. This causes the query to error out before spark attempts to execute the query. I have also tried running a variant of this query with functions.count() as the aggregation function and the query ran fine, so I believe that this is an error that only affects pandas udafs. Variant of query with standard aggregation function {code:java} test_df = spark.createDataFrame(test_data) test_df = test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts')) test_df = test_df.filter(functions.col('bool_counts') > 0) {code} was: Am currently running pyspark 2.4.2 and I am currently unable to run the following code. {code:java} from pyspark.sql import functions, types import pandas as pd import random # initialize test data test_data = [[False, int(random.random() * 2)] for i in range(1)] test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value']) # pandas udf pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), functions.PandasUDFType.GROUPED_AGG) # create spark DataFrame and build the query test_df = spark.createDataFrame(test_data) test_df = test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result')) test_df = test_df.filter(functions.col('bool_any_result') == True) # write to output test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite') {code} Below is a truncated error message. {code:java} Py4JJavaError: An error occurred while calling o1125.parquet. : org.apache.spark.SparkException: Job aborted. ... Exchange hashpartitioning(int_value#123L, 2000) +- *(1) Filter ((bool_value#122) = true) +- Scan ExistingRDD arrow[bool_value#122,int_value#123L] ... Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: (input[0, boolean, true]){code} What appears to be happening is that the query optimizer incorrectly pushes up the filter predicate on bool_any_result before the group by operation. This causes the query to error out before spark attempts to execute the query. I have also tried running a variant of this query with functions.count() as the aggregation function and the query ran fine, so I believe that this is an error that only affects pandas udfs. Variant of query with standard aggregation function {code:java} test_df = spark.createDataFrame(test_data) test_df = test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts')) test_df = test_df.filter(functions.col('bool_counts') > 0) {code} > Issue when running queries using filter predicates on pandas GROUPED_AGG udafs > -- > > Key: SPARK-27691 > URL: https://issues.apache.org/jira/browse/SPARK-27691 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.4.2 >Reporter: Michael Tong >Priority: Major > > Am currently running pyspark 2.4.2 and I am currently unable to run the > following code. > > {code:java} > from pyspark.sql import functions, types > import pandas as pd > import random > # initialize test data > test_data = [[False, int(random.random() * 2)] for i in range(1)] > test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])
[jira] [Created] (SPARK-40824) Certain aggregations cause extra exchange steps on unioned and bucketed tables
Michael Tong created SPARK-40824: Summary: Certain aggregations cause extra exchange steps on unioned and bucketed tables Key: SPARK-40824 URL: https://issues.apache.org/jira/browse/SPARK-40824 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.0 Reporter: Michael Tong An extension to https://issues.apache.org/jira/browse/SPARK-22898 Currently working on a POC where we store aggregations of features across different datasets. I have noticed that when you try to do certain aggregation operations across multiple tables, spark will introduce an extra exchange step {code:java} # initializing the tables sql(""" CREATE TABLE t1 (`id` BIGINT, `value` INT) USING PARQUET CLUSTERED BY (id) INTO 1 BUCKETS """) sql(""" CREATE TABLE t2 (`id` BIGINT, `value` INT) USING PARQUET CLUSTERED BY (id) INTO 1 BUCKETS """) sql("INSERT INTO TABLE t1 VALUES(1, 2)") sql("INSERT INTO TABLE t2 VALUES(1, 3)") # aggregation, note the exchange after the union operation sql(""" SELECT id, COUNT(*) FROM (SELECT id FROM t1 UNION SELECT id FROM t2) GROUP BY id """).explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[id#92L], functions=[count(1)]) +- HashAggregate(keys=[id#92L], functions=[partial_count(1)]) +- HashAggregate(keys=[id#92L], functions=[]) +- Exchange hashpartitioning(id#92L, 100), ENSURE_REQUIREMENTS, [id=#202] +- HashAggregate(keys=[id#92L], functions=[]) +- Union :- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 1 out of 1 +- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 1 out of 1 {code} This seems like an issue with the query optimizer because if you use a different set and order of operations (in this case groupby/count on individual tables, join the tables, then infer the union count from the joined values), you get a query plan that doesn't have this exchange step {code:java} sql(""" SELECT t1_agg.id, t1_agg.count + t2_agg.count as count FROM (SELECT id, COUNT(*) as count from t1 GROUP BY id) as t1_agg JOIN (SELECT id, COUNT(*) as count from t2 GROUP BY id) as t2_agg ON t1_agg.id=t2_agg.id """).explain() # note the lack of an exchange step == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [id#92L, (count#121L + count#122L) AS count#123L] +- SortMergeJoin [id#92L], [id#94L], Inner :- Sort [id#92L ASC NULLS FIRST], false, 0 : +- HashAggregate(keys=[id#92L], functions=[count(1)]) : +- HashAggregate(keys=[id#92L], functions=[partial_count(1)]) : +- Filter isnotnull(id#92L) : +- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#92L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct, SelectedBucketsCount: 1 out of 1 +- Sort [id#94L ASC NULLS FIRST], false, 0 +- HashAggregate(keys=[id#94L], functions=[count(1)]) +- HashAggregate(keys=[id#94L], functions=[partial_count(1)]) +- Filter isnotnull(id#94L) +- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#94L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct, SelectedBucketsCount: 1 out of 1 {code} It feels like the first union->aggregate query should not have an exchange step similar to the second one. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-42645) Introduce feature to allow for function caching across input rows.
Michael Tong created SPARK-42645: Summary: Introduce feature to allow for function caching across input rows. Key: SPARK-42645 URL: https://issues.apache.org/jira/browse/SPARK-42645 Project: Spark Issue Type: Wish Components: Optimizer Affects Versions: 3.3.2 Reporter: Michael Tong Introduce the ability to make functions cachable across input rows. I'm imagining this function to work similarly to python's [functools.cache|https://docs.python.org/3/library/functools.html#functools.cache] where you could add a decorator to certain expensive functions that you know will regularly encounter repeated values as you read the input data. With this new feature you would be able to significantly speed up many real world jobs that use expensive functions on data that naturally has repeated column values. An example of this would be parsing user agent fields from internet traffic logs partitioned by user id. Even though the data is not sorted by user agent, in a sample of 10k continuous rows there would be much less than 10k unique values because popular user agents exist on a large fraction of traffic and the user agent of the first event from a user is likely to be shared among all subsequent events from that user. Currently there is a way to hack an approximation of this in a python implementation of this via pandas_udfs. This works because pandas_udfs by default read in batches of 10k input rows, so you can used a caching UDF that empties every 10k rows. At my current job I have noticed that some applications of this trick can significantly speed up queries where custom UDFs are the bottleneck in a query. An example of this is {code:java} @F.pandas_udf(T.StringType()) def parse_user_agent_field(user_agent_series): @functools.cache def parse_user_agent_field_helper(user_agent): # parse the user agent and return the relevant field return None return user_agent_series.apply(parse_user_agent_field_helper){code} It would be nice if there was some official support for this behavior for both built in functions and UDFs. If there was official support for this I'd imagine it to look something like {code:java} # using pyspark dataframe API df = df.withColumn(output_col, F.cache(F.function)(input_col)){code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org