[jira] [Created] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udfs

2019-05-13 Thread Michael Tong (JIRA)
Michael Tong created SPARK-27691:


 Summary: Issue when running queries using filter predicates on 
pandas GROUPED_AGG udfs
 Key: SPARK-27691
 URL: https://issues.apache.org/jira/browse/SPARK-27691
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.4.2
Reporter: Michael Tong


Am currently running pyspark 2.4.2 and I am currently unable to run the 
following code.

 
{code:java}
from pyspark.sql import functions, types
import pandas as pd
import random

# initialize test data
test_data = [[False, int(random.random() * 2)] for i in range(1)]
test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

# pandas udf
pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
functions.PandasUDFType.GROUPED_AGG)

# create spark DataFrame and build the query
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
test_df = test_df.filter(functions.col('bool_any_result') == True)

# write to output
test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
{code}
 

Below is a truncated error message.

 
{code:java}
Py4JJavaError: An error occurred while calling o1125.parquet. : 
org.apache.spark.SparkException: Job aborted.

...

Exchange hashpartitioning(int_value#123L, 2000)
+- *(1) Filter ((bool_value#122) = true)
   +- Scan ExistingRDD arrow[bool_value#122,int_value#123L]

...

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
(input[0, boolean, true]){code}
 

 

What appears to be happening is that the query optimizer incorrectly pushes up 
the filter predicate on bool_any_result before the group by operation. This 
causes the query to error out before spark attempts to execute the query. I 
have also tried running a variant of this query with functions.count() as the 
aggregation function and the query ran fine, so I believe that this is an error 
that only affects pandas udfs.

 

Variant of query with standard aggregation function
{code:java}
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
test_df = test_df.filter(functions.col('bool_counts') > 0)
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udafs

2019-05-13 Thread Michael Tong (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Tong updated SPARK-27691:
-
Summary: Issue when running queries using filter predicates on pandas 
GROUPED_AGG udafs  (was: Issue when running queries using filter predicates on 
pandas GROUPED_AGG udfs)

> Issue when running queries using filter predicates on pandas GROUPED_AGG udafs
> --
>
> Key: SPARK-27691
> URL: https://issues.apache.org/jira/browse/SPARK-27691
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.2
>Reporter: Michael Tong
>Priority: Major
>
> Am currently running pyspark 2.4.2 and I am currently unable to run the 
> following code.
>  
> {code:java}
> from pyspark.sql import functions, types
> import pandas as pd
> import random
> # initialize test data
> test_data = [[False, int(random.random() * 2)] for i in range(1)]
> test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])
> # pandas udf
> pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
> functions.PandasUDFType.GROUPED_AGG)
> # create spark DataFrame and build the query
> test_df = spark.createDataFrame(test_data)
> test_df = 
> test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
> test_df = test_df.filter(functions.col('bool_any_result') == True)
> # write to output
> test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
> {code}
>  
> Below is a truncated error message.
>  
> {code:java}
> Py4JJavaError: An error occurred while calling o1125.parquet. : 
> org.apache.spark.SparkException: Job aborted.
> ...
> Exchange hashpartitioning(int_value#123L, 2000)
> +- *(1) Filter ((bool_value#122) = true)
>+- Scan ExistingRDD arrow[bool_value#122,int_value#123L]
> ...
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate 
> expression: (input[0, boolean, true]){code}
>  
>  
> What appears to be happening is that the query optimizer incorrectly pushes 
> up the filter predicate on bool_any_result before the group by operation. 
> This causes the query to error out before spark attempts to execute the 
> query. I have also tried running a variant of this query with 
> functions.count() as the aggregation function and the query ran fine, so I 
> believe that this is an error that only affects pandas udfs.
>  
> Variant of query with standard aggregation function
> {code:java}
> test_df = spark.createDataFrame(test_data)
> test_df = 
> test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
> test_df = test_df.filter(functions.col('bool_counts') > 0)
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27691) Issue when running queries using filter predicates on pandas GROUPED_AGG udafs

2019-05-13 Thread Michael Tong (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Tong updated SPARK-27691:
-
Description: 
Am currently running pyspark 2.4.2 and I am currently unable to run the 
following code.

 
{code:java}
from pyspark.sql import functions, types
import pandas as pd
import random

# initialize test data
test_data = [[False, int(random.random() * 2)] for i in range(1)]
test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

# pandas udaf
pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
functions.PandasUDFType.GROUPED_AGG)

# create spark DataFrame and build the query
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
test_df = test_df.filter(functions.col('bool_any_result') == True)

# write to output
test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
{code}
 

Below is a truncated error message.

 
{code:java}
Py4JJavaError: An error occurred while calling o1125.parquet. : 
org.apache.spark.SparkException: Job aborted.

...

Exchange hashpartitioning(int_value#123L, 2000)
+- *(1) Filter ((bool_value#122) = true)
   +- Scan ExistingRDD arrow[bool_value#122,int_value#123L]

...

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
(input[0, boolean, true]){code}
 

 

What appears to be happening is that the query optimizer incorrectly pushes up 
the filter predicate on bool_any_result before the group by operation. This 
causes the query to error out before spark attempts to execute the query. I 
have also tried running a variant of this query with functions.count() as the 
aggregation function and the query ran fine, so I believe that this is an error 
that only affects pandas udafs.

 

Variant of query with standard aggregation function
{code:java}
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
test_df = test_df.filter(functions.col('bool_counts') > 0)
{code}
 

 

 

  was:
Am currently running pyspark 2.4.2 and I am currently unable to run the 
following code.

 
{code:java}
from pyspark.sql import functions, types
import pandas as pd
import random

# initialize test data
test_data = [[False, int(random.random() * 2)] for i in range(1)]
test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

# pandas udf
pandas_any_udf = functions.pandas_udf(lambda x: x.any(), types.BooleanType(), 
functions.PandasUDFType.GROUPED_AGG)

# create spark DataFrame and build the query
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(pandas_any_udf('bool_value').alias('bool_any_result'))
test_df = test_df.filter(functions.col('bool_any_result') == True)

# write to output
test_df.write.parquet('/tmp/mtong/write_test', mode='overwrite')
{code}
 

Below is a truncated error message.

 
{code:java}
Py4JJavaError: An error occurred while calling o1125.parquet. : 
org.apache.spark.SparkException: Job aborted.

...

Exchange hashpartitioning(int_value#123L, 2000)
+- *(1) Filter ((bool_value#122) = true)
   +- Scan ExistingRDD arrow[bool_value#122,int_value#123L]

...

Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
(input[0, boolean, true]){code}
 

 

What appears to be happening is that the query optimizer incorrectly pushes up 
the filter predicate on bool_any_result before the group by operation. This 
causes the query to error out before spark attempts to execute the query. I 
have also tried running a variant of this query with functions.count() as the 
aggregation function and the query ran fine, so I believe that this is an error 
that only affects pandas udfs.

 

Variant of query with standard aggregation function
{code:java}
test_df = spark.createDataFrame(test_data)
test_df = 
test_df.groupby('int_value').agg(functions.count('bool_value').alias('bool_counts'))
test_df = test_df.filter(functions.col('bool_counts') > 0)
{code}
 

 

 


> Issue when running queries using filter predicates on pandas GROUPED_AGG udafs
> --
>
> Key: SPARK-27691
> URL: https://issues.apache.org/jira/browse/SPARK-27691
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.4.2
>Reporter: Michael Tong
>Priority: Major
>
> Am currently running pyspark 2.4.2 and I am currently unable to run the 
> following code.
>  
> {code:java}
> from pyspark.sql import functions, types
> import pandas as pd
> import random
> # initialize test data
> test_data = [[False, int(random.random() * 2)] for i in range(1)]
> test_data = pd.DataFrame(test_data, columns=['bool_value', 'int_value'])

[jira] [Created] (SPARK-40824) Certain aggregations cause extra exchange steps on unioned and bucketed tables

2022-10-17 Thread Michael Tong (Jira)
Michael Tong created SPARK-40824:


 Summary: Certain aggregations cause extra exchange steps on 
unioned and bucketed tables
 Key: SPARK-40824
 URL: https://issues.apache.org/jira/browse/SPARK-40824
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.3.0
Reporter: Michael Tong


An extension to https://issues.apache.org/jira/browse/SPARK-22898

 

Currently working on a POC where we store aggregations of features across 
different datasets. I have noticed that when you try to do certain aggregation 
operations across multiple tables, spark will introduce an extra exchange step

 
{code:java}
# initializing the tables
sql("""
    CREATE TABLE t1 (`id` BIGINT, `value` INT)
    USING PARQUET
    CLUSTERED BY (id)
    INTO 1 BUCKETS
    """)
sql("""
    CREATE TABLE t2 (`id` BIGINT, `value` INT)
    USING PARQUET
    CLUSTERED BY (id)
    INTO 1 BUCKETS
    """)
sql("INSERT INTO TABLE t1 VALUES(1, 2)")
sql("INSERT INTO TABLE t2 VALUES(1, 3)")

# aggregation, note the exchange after the union operation
sql("""
    SELECT id, COUNT(*)
    FROM (SELECT id FROM t1 UNION SELECT id FROM t2)
    GROUP BY id
    """).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[id#92L], functions=[count(1)])
   +- HashAggregate(keys=[id#92L], functions=[partial_count(1)])
      +- HashAggregate(keys=[id#92L], functions=[])
         +- Exchange hashpartitioning(id#92L, 100), ENSURE_REQUIREMENTS, 
[id=#202]
            +- HashAggregate(keys=[id#92L], functions=[])
               +- Union
                  :- FileScan parquet default.t1[id#92L] Batched: true, 
Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct, SelectedBucketsCount: 1 out of 1
                  +- FileScan parquet default.t2[id#94L] Batched: true, 
Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct, SelectedBucketsCount: 1 out of 1
 {code}
This seems like an issue with the query optimizer because if you use a 
different set and order of operations (in this case groupby/count on individual 
tables, join the tables, then infer the union count from the joined values), 
you get a query plan that doesn't have this exchange step
{code:java}
sql("""
    SELECT t1_agg.id, t1_agg.count + t2_agg.count as count
    FROM (SELECT id, COUNT(*) as count from t1 GROUP BY id) as t1_agg
    JOIN (SELECT id, COUNT(*) as count from t2 GROUP BY id) as t2_agg ON 
t1_agg.id=t2_agg.id
""").explain()

# note the lack of an exchange step
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#92L, (count#121L + count#122L) AS count#123L]
   +- SortMergeJoin [id#92L], [id#94L], Inner
      :- Sort [id#92L ASC NULLS FIRST], false, 0
      :  +- HashAggregate(keys=[id#92L], functions=[count(1)])
      :     +- HashAggregate(keys=[id#92L], functions=[partial_count(1)])
      :        +- Filter isnotnull(id#92L)
      :           +- FileScan parquet default.t1[id#92L] Batched: true, 
Bucketed: true, DataFilters: [isnotnull(id#92L)], Format: Parquet, Location: 
InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], 
PushedFilters: [IsNotNull(id)], ReadSchema: struct, 
SelectedBucketsCount: 1 out of 1
      +- Sort [id#94L ASC NULLS FIRST], false, 0
         +- HashAggregate(keys=[id#94L], functions=[count(1)])
            +- HashAggregate(keys=[id#94L], functions=[partial_count(1)])
               +- Filter isnotnull(id#94L)
                  +- FileScan parquet default.t2[id#94L] Batched: true, 
Bucketed: true, DataFilters: [isnotnull(id#94L)], Format: Parquet, Location: 
InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], 
PushedFilters: [IsNotNull(id)], ReadSchema: struct, 
SelectedBucketsCount: 1 out of 1 {code}
It feels like the first union->aggregate query should not have an exchange step 
similar to the second one.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42645) Introduce feature to allow for function caching across input rows.

2023-03-01 Thread Michael Tong (Jira)
Michael Tong created SPARK-42645:


 Summary: Introduce feature to allow for function caching across 
input rows.
 Key: SPARK-42645
 URL: https://issues.apache.org/jira/browse/SPARK-42645
 Project: Spark
  Issue Type: Wish
  Components: Optimizer
Affects Versions: 3.3.2
Reporter: Michael Tong


Introduce the ability to make functions cachable across input rows. I'm 
imagining this function to work similarly to python's 
[functools.cache|https://docs.python.org/3/library/functools.html#functools.cache]
 where you could add a decorator to certain expensive functions that you know 
will regularly encounter repeated values as you read the input data.

 

With this new feature you would be able to significantly speed up many real 
world jobs that use expensive functions on data that naturally has repeated 
column values. An example of this would be parsing user agent fields from 
internet traffic logs partitioned by user id. Even though the data is not 
sorted by user agent, in a sample of 10k continuous rows there would be much 
less than 10k unique values because popular user agents exist on a large 
fraction of traffic and the user agent of the first event from a user is likely 
to be shared among all subsequent events from that user. Currently there is a 
way to hack an approximation of this in a python implementation of this via 
pandas_udfs. This works because pandas_udfs by default read in batches of 10k 
input rows, so you can used a caching UDF that empties every 10k rows. At my 
current job I have noticed that some applications of this trick can 
significantly speed up queries where custom UDFs are the bottleneck in a query. 
An example of this is

 
{code:java}
@F.pandas_udf(T.StringType())
def parse_user_agent_field(user_agent_series):
@functools.cache
def parse_user_agent_field_helper(user_agent):
# parse the user agent and return the relevant field
return None
return user_agent_series.apply(parse_user_agent_field_helper){code}
 

 

It would be nice if there was some official support for this behavior for both 
built in functions and UDFs. If there was official support for this I'd imagine 
it to look something like

 
{code:java}
# using pyspark dataframe API
df = df.withColumn(output_col, F.cache(F.function)(input_col)){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org