[jira] [Commented] (SPARK-44670) Fix the `test_to_excel` tests for python3.7

2023-08-03 Thread Madhukar (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750959#comment-17750959
 ] 

Madhukar commented on SPARK-44670:
--

Raised a PR for using openpyxl instead of xlrd - 
[https://github.com/apache/spark/pull/42339] 

> Fix the `test_to_excel` tests for python3.7
> ---
>
> Key: SPARK-44670
> URL: https://issues.apache.org/jira/browse/SPARK-44670
> Project: Spark
>  Issue Type: Bug
>  Components: Pandas API on Spark
>Affects Versions: 3.4.1
>Reporter: Madhukar
>Priority: Minor
>
> With python3.7 and openpyxl installed got error:
> ==
> ERROR: test_to_excel 
> (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)
> Traceback (most recent call last):
>   File 
> "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
>  line 102, in test_to_excel
>     dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location)
>   File 
> "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
>  line 89, in get_excel_dfs
>     "got": pd.read_excel(pandas_on_spark_location, index_col=0),
>   File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", 
> line 296, in wrapper
>     return func(*args, **kwargs)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", 
> line 304, in read_excel
>     io = ExcelFile(io, engine=engine)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", 
> line 867, in __init__
>     self._reader = self._engines[engine](self._io)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", 
> line 21, in __init__
>     import_optional_dependency("xlrd", extra=err_msg)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/compat/_optional.py", 
> line 110, in import_optional_dependency
>     raise ImportError(msg) from None
> ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for 
> Excel support Use pip or conda to install xlrd.
> --
>  
>  
>  
> But with xlrd 2.0.1 installed getting error
> ==
> ERROR: test_to_excel 
> (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)
> --
> Traceback (most recent call last):
>   File 
> "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
>  line 102, in test_to_excel
>     dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location)
>   File 
> "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
>  line 89, in get_excel_dfs
>     "got": pd.read_excel(pandas_on_spark_location, index_col=0),
>   File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", 
> line 296, in wrapper
>     return func(*args, **kwargs)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", 
> line 304, in read_excel
>     io = ExcelFile(io, engine=engine)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", 
> line 867, in __init__
>     self._reader = self._engines[engine](self._io)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", 
> line 22, in __init__
>     super().__init__(filepath_or_buffer)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", 
> line 353, in __init__
>     self.book = self.load_workbook(filepath_or_buffer)
>   File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", 
> line 37, in load_workbook
>     return open_workbook(filepath_or_buffer)
>   File "/opt/conda/lib/python3.7/site-packages/xlrd/__init__.py", line 170, 
> in open_workbook
>     raise XLRDError(FILE_FORMAT_DESCRIPTIONS[file_format]+'; not supported')
> xlrd.biffh.XLRDError: Excel xlsx file; not supported
> --
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7

2023-08-03 Thread Madhukar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Madhukar updated SPARK-44670:
-
Description: 
With python3.7 and openpyxl installed got error:

==

ERROR: test_to_excel 
(pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)

Traceback (most recent call last):

  File 
"/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
 line 102, in test_to_excel

    dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location)

  File 
"/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
 line 89, in get_excel_dfs

    "got": pd.read_excel(pandas_on_spark_location, index_col=0),

  File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", 
line 296, in wrapper

    return func(*args, **kwargs)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 
304, in read_excel

    io = ExcelFile(io, engine=engine)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 
867, in __init__

    self._reader = self._engines[engine](self._io)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", line 
21, in __init__

    import_optional_dependency("xlrd", extra=err_msg)

  File "/opt/conda/lib/python3.7/site-packages/pandas/compat/_optional.py", 
line 110, in import_optional_dependency

    raise ImportError(msg) from None

ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for 
Excel support Use pip or conda to install xlrd.

--

 

 

 

But with xlrd 2.0.1 installed getting error

==

ERROR: test_to_excel 
(pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)

--

Traceback (most recent call last):

  File 
"/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
 line 102, in test_to_excel

    dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location)

  File 
"/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
 line 89, in get_excel_dfs

    "got": pd.read_excel(pandas_on_spark_location, index_col=0),

  File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", 
line 296, in wrapper

    return func(*args, **kwargs)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 
304, in read_excel

    io = ExcelFile(io, engine=engine)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 
867, in __init__

    self._reader = self._engines[engine](self._io)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", line 
22, in __init__

    super().__init__(filepath_or_buffer)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 
353, in __init__

    self.book = self.load_workbook(filepath_or_buffer)

  File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", line 
37, in load_workbook

    return open_workbook(filepath_or_buffer)

  File "/opt/conda/lib/python3.7/site-packages/xlrd/__init__.py", line 170, in 
open_workbook

    raise XLRDError(FILE_FORMAT_DESCRIPTIONS[file_format]+'; not supported')

xlrd.biffh.XLRDError: Excel xlsx file; not supported

--

 

  was:
With python3.7 and openpyxl installed got error:

==

ERROR: test_to_excel 
(pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)

ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for 
Excel support Use pip or conda to install xlrd.

--

But with xlrd installed getting error

xlrd.biffh.XLRDError: Excel xlsx file; not supported

 


> Fix the `test_to_excel` tests for python3.7
> ---
>
> Key: SPARK-44670
> URL: https://issues.apache.org/jira/browse/SPARK-44670
> Project: Spark
>  Issue Type: Bug
>  Components: Pandas API on Spark
>Affects Versions: 3.4.1
>Reporter: Madhukar
>Priority: Minor
>
> With python3.7 and openpyxl installed got error:
> ==
> ERROR: test_to_excel 
> (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)
> Traceback (most recent call last):
>   File 
> "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py",
>  line 102, in test_to_excel
>     dataframes = 

[jira] [Resolved] (SPARK-44582) JVM crash caused by SMJ and WindowExec

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-44582.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 42206
[https://github.com/apache/spark/pull/42206]

> JVM crash caused by SMJ and WindowExec
> --
>
> Key: SPARK-44582
> URL: https://issues.apache.org/jira/browse/SPARK-44582
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Wan Kun
>Assignee: Wan Kun
>Priority: Major
> Fix For: 4.0.0
>
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> After inner SMJ early cleanup offheap memory, when the SMJ call the left 
> Window next method, the JVM may crash due to accessing unallocated memory.
> !screenshot-1.png! 
> !screenshot-2.png! 
>  {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7fb5e0052954, pid=21756, tid=0x7fa9e8f65640
> #
> # JRE version: OpenJDK Runtime Environment (Zulu 8.68.0.20-SA-linux64) 
> (8.0_362-b08) (build 1.8.0_362-b08)
> # Java VM: OpenJDK 64-Bit Server VM (25.362-b08 mixed mode linux-amd64 )
> # Problematic frame:
> # v  ~StubRoutines::jlong_disjoint_arraycopy
> #
> # Core dump written. Default location: 
> /hadoop/7/yarn/local/usercache/b_carmel/appcache/application_1684894519955_24406/container_e2311_1684894519955_24406_01_005660/core
>  or core.21756
> #
> # If you would like to submit a bug report, please visit:
> #   http://www.azul.com/support/
> #
> ---  T H R E A D  ---
> Current thread (0x7fb5d8f0c800):  JavaThread "Executor 2802 task launch 
> worker for task 128116463, task 101.3 in stage 452404.0 of app 
> application_1684894519955_24406" daemon [_thread_in_Java, id=22042, 
> stack(0x7fa9e8766000,0x7fa9e8f66000)]
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 
> 0x7fa9750deac0
> Registers:
> RAX=0x7fa9750deae8, RBX=0x0018, RCX=0x7fb581f0f0a8, 
> RDX=0x
> RSP=0x7fa9e8f63a80, RBP=0x7fa9e8f63a80, RSI=0x7fb581f0f088, 
> RDI=0x7fa9750deae0
> R8 =0x7fb581f0f070, R9 =0x97446ed2, R10=0x7fb5e0053500, 
> R11=0x7fb581f0f0b0
> R12=0x7fb585ff17b0, R13=0x7fa9750deac0, R14=0x, 
> R15=0x7fb5d8f0c800
> RIP=0x7fb5e0052954, EFLAGS=0x00010297, CSGSFS=0x002b0033, 
> ERR=0x0004
>   TRAPNO=0x000e
> Top of Stack: (sp=0x7fa9e8f63a80)
> 0x7fa9e8f63a80:   0028 7fb5e127ee97
> 0x7fa9e8f63a90:    7fa9e8f63b20
> 0x7fa9e8f63aa0:   7fa9e8f63b00 7fb60882cf70
> 0x7fa9e8f63ab0:   7faab2dda9e0 7fb581f0f070
> 0x7fa9e8f63ac0:   7fb551801188 7fb54f8c0ef8
> Stack: [0x7f8a0380,0x7f8a0400],  sp=0x7f8a03ffd620,  free 
> space=8181k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> v  ~StubRoutines::jint_disjoint_arraycopy
> J 36127 C2 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V
>  (188 bytes) @ 0x7f966187ac9f [0x7f966187a820+0x47f]
> J 36146 C2 
> org.apache.spark.sql.execution.window.WindowExec$$anon$1.next()Ljava/lang/Object;
>  (5 bytes) @ 0x7f9661a8eefc [0x7f9661a8dd60+0x119c]
> J 36153 C2 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext()V
>  (381 bytes) @ 0x7f966180185c [0x7f9661801760+0xfc]
> J 36246 C2 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z
>  (392 bytes) @ 0x7f96607388f0 [0x7f96607381e0+0x710]
> J 36249 C1 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V
>  (109 bytes) @ 0x7f965fa8ee64 [0x7f965fa8e560+0x904]
> J 35645 C2 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z (31 
> bytes) @ 0x7f965fbc58e4 [0x7f965fbc58a0+0x44]
> j  
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lscala/collection/Iterator;Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+1
> j  
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$4398.apply()Ljava/lang/Object;+8
> j  
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Lscala/Function0;Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4
> j  
> 

[jira] [Assigned] (SPARK-44582) JVM crash caused by SMJ and WindowExec

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-44582:


Assignee: Wan Kun

> JVM crash caused by SMJ and WindowExec
> --
>
> Key: SPARK-44582
> URL: https://issues.apache.org/jira/browse/SPARK-44582
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Wan Kun
>Assignee: Wan Kun
>Priority: Major
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> After inner SMJ early cleanup offheap memory, when the SMJ call the left 
> Window next method, the JVM may crash due to accessing unallocated memory.
> !screenshot-1.png! 
> !screenshot-2.png! 
>  {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7fb5e0052954, pid=21756, tid=0x7fa9e8f65640
> #
> # JRE version: OpenJDK Runtime Environment (Zulu 8.68.0.20-SA-linux64) 
> (8.0_362-b08) (build 1.8.0_362-b08)
> # Java VM: OpenJDK 64-Bit Server VM (25.362-b08 mixed mode linux-amd64 )
> # Problematic frame:
> # v  ~StubRoutines::jlong_disjoint_arraycopy
> #
> # Core dump written. Default location: 
> /hadoop/7/yarn/local/usercache/b_carmel/appcache/application_1684894519955_24406/container_e2311_1684894519955_24406_01_005660/core
>  or core.21756
> #
> # If you would like to submit a bug report, please visit:
> #   http://www.azul.com/support/
> #
> ---  T H R E A D  ---
> Current thread (0x7fb5d8f0c800):  JavaThread "Executor 2802 task launch 
> worker for task 128116463, task 101.3 in stage 452404.0 of app 
> application_1684894519955_24406" daemon [_thread_in_Java, id=22042, 
> stack(0x7fa9e8766000,0x7fa9e8f66000)]
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 
> 0x7fa9750deac0
> Registers:
> RAX=0x7fa9750deae8, RBX=0x0018, RCX=0x7fb581f0f0a8, 
> RDX=0x
> RSP=0x7fa9e8f63a80, RBP=0x7fa9e8f63a80, RSI=0x7fb581f0f088, 
> RDI=0x7fa9750deae0
> R8 =0x7fb581f0f070, R9 =0x97446ed2, R10=0x7fb5e0053500, 
> R11=0x7fb581f0f0b0
> R12=0x7fb585ff17b0, R13=0x7fa9750deac0, R14=0x, 
> R15=0x7fb5d8f0c800
> RIP=0x7fb5e0052954, EFLAGS=0x00010297, CSGSFS=0x002b0033, 
> ERR=0x0004
>   TRAPNO=0x000e
> Top of Stack: (sp=0x7fa9e8f63a80)
> 0x7fa9e8f63a80:   0028 7fb5e127ee97
> 0x7fa9e8f63a90:    7fa9e8f63b20
> 0x7fa9e8f63aa0:   7fa9e8f63b00 7fb60882cf70
> 0x7fa9e8f63ab0:   7faab2dda9e0 7fb581f0f070
> 0x7fa9e8f63ac0:   7fb551801188 7fb54f8c0ef8
> Stack: [0x7f8a0380,0x7f8a0400],  sp=0x7f8a03ffd620,  free 
> space=8181k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
> code)
> v  ~StubRoutines::jint_disjoint_arraycopy
> J 36127 C2 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V
>  (188 bytes) @ 0x7f966187ac9f [0x7f966187a820+0x47f]
> J 36146 C2 
> org.apache.spark.sql.execution.window.WindowExec$$anon$1.next()Ljava/lang/Object;
>  (5 bytes) @ 0x7f9661a8eefc [0x7f9661a8dd60+0x119c]
> J 36153 C2 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext()V
>  (381 bytes) @ 0x7f966180185c [0x7f9661801760+0xfc]
> J 36246 C2 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z
>  (392 bytes) @ 0x7f96607388f0 [0x7f96607381e0+0x710]
> J 36249 C1 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V
>  (109 bytes) @ 0x7f965fa8ee64 [0x7f965fa8e560+0x904]
> J 35645 C2 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z (31 
> bytes) @ 0x7f965fbc58e4 [0x7f965fbc58a0+0x44]
> j  
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lscala/collection/Iterator;Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+1
> j  
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$4398.apply()Ljava/lang/Object;+8
> j  
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Lscala/Function0;Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4
> j  
> 

[jira] [Created] (SPARK-44671) Retry ExecutePlan in case initial request didn't reach server in Python client

2023-08-03 Thread Hyukjin Kwon (Jira)
Hyukjin Kwon created SPARK-44671:


 Summary: Retry ExecutePlan in case initial request didn't reach 
server in Python client
 Key: SPARK-44671
 URL: https://issues.apache.org/jira/browse/SPARK-44671
 Project: Spark
  Issue Type: Task
  Components: Connect, PySpark
Affects Versions: 3.5.0
Reporter: Hyukjin Kwon


SPARK-44624 for Python



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44009) Support profiler for Python UDTFs

2023-08-03 Thread Allison Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Allison Wang updated SPARK-44009:
-
Summary: Support profiler for Python UDTFs   (was: Support memory_profiler 
for UDTFs )

> Support profiler for Python UDTFs 
> --
>
> Key: SPARK-44009
> URL: https://issues.apache.org/jira/browse/SPARK-44009
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Allison Wang
>Priority: Major
>
> Similar to Python UDFs, we should support the memory profiler for UDTFs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44663) Disable arrow optimization by default for Python UDTFs

2023-08-03 Thread Allison Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Allison Wang updated SPARK-44663:
-
Summary: Disable arrow optimization by default for Python UDTFs  (was: 
Disable arrow optimization by default)

> Disable arrow optimization by default for Python UDTFs
> --
>
> Key: SPARK-44663
> URL: https://issues.apache.org/jira/browse/SPARK-44663
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Allison Wang
>Priority: Major
>
> Disable arrow optimization to make Python UDTFs consistent with Python UDFs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7

2023-08-03 Thread Madhukar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Madhukar updated SPARK-44670:
-
Description: 
With python3.7 and openpyxl installed got error:

==

ERROR: test_to_excel 
(pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)

ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for 
Excel support Use pip or conda to install xlrd.

--

But with xlrd installed getting error

xlrd.biffh.XLRDError: Excel xlsx file; not supported

 

> Fix the `test_to_excel` tests for python3.7
> ---
>
> Key: SPARK-44670
> URL: https://issues.apache.org/jira/browse/SPARK-44670
> Project: Spark
>  Issue Type: Bug
>  Components: Pandas API on Spark
>Affects Versions: 3.4.1
>Reporter: Madhukar
>Priority: Minor
>
> With python3.7 and openpyxl installed got error:
> ==
> ERROR: test_to_excel 
> (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest)
> ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for 
> Excel support Use pip or conda to install xlrd.
> --
> But with xlrd installed getting error
> xlrd.biffh.XLRDError: Excel xlsx file; not supported
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7

2023-08-03 Thread Madhukar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Madhukar updated SPARK-44670:
-
Affects Version/s: 3.4.1
   (was: 3.4.0)

> Fix the `test_to_excel` tests for python3.7
> ---
>
> Key: SPARK-44670
> URL: https://issues.apache.org/jira/browse/SPARK-44670
> Project: Spark
>  Issue Type: Bug
>  Components: Pandas API on Spark
>Affects Versions: 3.4.1
>Reporter: Madhukar
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44670) Fix the `test_to_excel` tests for python3.7

2023-08-03 Thread Madhukar (Jira)
Madhukar created SPARK-44670:


 Summary: Fix the `test_to_excel` tests for python3.7
 Key: SPARK-44670
 URL: https://issues.apache.org/jira/browse/SPARK-44670
 Project: Spark
  Issue Type: Bug
  Components: Pandas API on Spark
Affects Versions: 3.4.0
Reporter: Madhukar


So far, we've been skipping the `read_excel` test in pandas API on Spark:
https://github.com/apache/spark/blob/6d2ce128058b439094cd1dd54253372af6977e79/python/pyspark/pandas/tests/test_dataframe_spark_io.py#L251

In https://github.com/apache/spark/pull/37671, we installed `openpyxl==3.0.10` 
to re-enable the `read_excel` tests, but it's still failing for some reason 
(Please see https://github.com/apache/spark/pull/37671#issuecomment-1237515485 
for more detail).

We should re-enable this test for improving the pandas-on-Spark test coverage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7

2023-08-03 Thread Madhukar (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Madhukar updated SPARK-44670:
-
Description: (was: So far, we've been skipping the `read_excel` test in 
pandas API on Spark:
https://github.com/apache/spark/blob/6d2ce128058b439094cd1dd54253372af6977e79/python/pyspark/pandas/tests/test_dataframe_spark_io.py#L251

In https://github.com/apache/spark/pull/37671, we installed `openpyxl==3.0.10` 
to re-enable the `read_excel` tests, but it's still failing for some reason 
(Please see https://github.com/apache/spark/pull/37671#issuecomment-1237515485 
for more detail).

We should re-enable this test for improving the pandas-on-Spark test coverage.)

> Fix the `test_to_excel` tests for python3.7
> ---
>
> Key: SPARK-44670
> URL: https://issues.apache.org/jira/browse/SPARK-44670
> Project: Spark
>  Issue Type: Bug
>  Components: Pandas API on Spark
>Affects Versions: 3.4.0
>Reporter: Madhukar
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-44668) ObjectMapper are threadsafe, we can reuse it in Object

2023-08-03 Thread Jia Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jia Fan closed SPARK-44668.
---

> ObjectMapper are threadsafe, we can reuse it in Object
> --
>
> Key: SPARK-44668
> URL: https://issues.apache.org/jira/browse/SPARK-44668
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Jia Fan
>Priority: Major
>
> ObjectMapper are threadsafe, we can reuse it in Object. But we create it in 
> trait, that's mean each object will create an ObjectMapper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44668) ObjectMapper are threadsafe, we can reuse it in Object

2023-08-03 Thread Jia Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jia Fan resolved SPARK-44668.
-
Resolution: Invalid

> ObjectMapper are threadsafe, we can reuse it in Object
> --
>
> Key: SPARK-44668
> URL: https://issues.apache.org/jira/browse/SPARK-44668
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Jia Fan
>Priority: Major
>
> ObjectMapper are threadsafe, we can reuse it in Object. But we create it in 
> trait, that's mean each object will create an ObjectMapper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44669) Parquet/ORC files written using Hive Serde should has file extension

2023-08-03 Thread Cheng Pan (Jira)
Cheng Pan created SPARK-44669:
-

 Summary: Parquet/ORC files written using Hive Serde should has 
file extension
 Key: SPARK-44669
 URL: https://issues.apache.org/jira/browse/SPARK-44669
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.2
Reporter: Cheng Pan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44668) ObjectMapper are threadsafe, we can reuse it in Object

2023-08-03 Thread Jia Fan (Jira)
Jia Fan created SPARK-44668:
---

 Summary: ObjectMapper are threadsafe, we can reuse it in Object
 Key: SPARK-44668
 URL: https://issues.apache.org/jira/browse/SPARK-44668
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.4.1
Reporter: Jia Fan


ObjectMapper are threadsafe, we can reuse it in Object. But we create it in 
trait, that's mean each object will create an ObjectMapper.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44667) Uninstall large ML libraries for non-ML jobs

2023-08-03 Thread Ruifeng Zheng (Jira)
Ruifeng Zheng created SPARK-44667:
-

 Summary: Uninstall large ML libraries for non-ML jobs
 Key: SPARK-44667
 URL: https://issues.apache.org/jira/browse/SPARK-44667
 Project: Spark
  Issue Type: Sub-task
  Components: Project Infra
Affects Versions: 4.0.0
Reporter: Ruifeng Zheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44666) Uninstall CodeQL/Go/Node in non-container jobs

2023-08-03 Thread Ruifeng Zheng (Jira)
Ruifeng Zheng created SPARK-44666:
-

 Summary: Uninstall CodeQL/Go/Node in non-container jobs
 Key: SPARK-44666
 URL: https://issues.apache.org/jira/browse/SPARK-44666
 Project: Spark
  Issue Type: Sub-task
  Components: Project Infra
Affects Versions: 4.0.0
Reporter: Ruifeng Zheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44653) non-trivial DataFrame unions should not break caching

2023-08-03 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-44653.
-
Fix Version/s: 3.3.3
   3.5.0
   3.4.2
   Resolution: Fixed

Issue resolved by pull request 42315
[https://github.com/apache/spark/pull/42315]

> non-trivial DataFrame unions should not break caching
> -
>
> Key: SPARK-44653
> URL: https://issues.apache.org/jira/browse/SPARK-44653
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.3.3, 3.5.0, 3.4.2
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44653) non-trivial DataFrame unions should not break caching

2023-08-03 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-44653:
---

Assignee: Wenchen Fan

> non-trivial DataFrame unions should not break caching
> -
>
> Key: SPARK-44653
> URL: https://issues.apache.org/jira/browse/SPARK-44653
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44624) Spark Connect reattachable Execute when initial ExecutePlan didn't reach server

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-44624:


Assignee: Juliusz Sompolski

> Spark Connect reattachable Execute when initial ExecutePlan didn't reach 
> server
> ---
>
> Key: SPARK-44624
> URL: https://issues.apache.org/jira/browse/SPARK-44624
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Juliusz Sompolski
>Assignee: Juliusz Sompolski
>Priority: Major
>
> If the ExecutePlan never reached the server, a ReattachExecute will fail with 
> INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send 
> ExecutePlan again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44624) Spark Connect reattachable Execute when initial ExecutePlan didn't reach server

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-44624.
--
Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 42282
[https://github.com/apache/spark/pull/42282]

> Spark Connect reattachable Execute when initial ExecutePlan didn't reach 
> server
> ---
>
> Key: SPARK-44624
> URL: https://issues.apache.org/jira/browse/SPARK-44624
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Juliusz Sompolski
>Assignee: Juliusz Sompolski
>Priority: Major
> Fix For: 3.5.0, 4.0.0
>
>
> If the ExecutePlan never reached the server, a ReattachExecute will fail with 
> INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send 
> ExecutePlan again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44664) Release the execute when closing the iterator in Python client

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-44664.
--
Fix Version/s: 3.5.0
   4.0.0
 Assignee: Hyukjin Kwon
   Resolution: Fixed

Fixed in https://github.com/apache/spark/pull/42330

> Release the execute when closing the iterator in Python client
> --
>
> Key: SPARK-44664
> URL: https://issues.apache.org/jira/browse/SPARK-44664
> Project: Spark
>  Issue Type: Task
>  Components: Connect, PySpark
>Affects Versions: 3.5.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
> Fix For: 3.5.0, 4.0.0
>
>
> See SPARK-44656 and SPARK-44642. We need the same in Python client



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44619) Free up disk space for container jobs

2023-08-03 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng resolved SPARK-44619.
---
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 42253
[https://github.com/apache/spark/pull/42253]

> Free up disk space for container jobs
> -
>
> Key: SPARK-44619
> URL: https://issues.apache.org/jira/browse/SPARK-44619
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Assignee: Ruifeng Zheng
>Priority: Major
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43562) Enable DataFrameTests.test_append for pandas 2.0.0.

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-43562:


Assignee: Haejoon Lee

> Enable DataFrameTests.test_append for pandas 2.0.0.
> ---
>
> Key: SPARK-43562
> URL: https://issues.apache.org/jira/browse/SPARK-43562
> Project: Spark
>  Issue Type: Sub-task
>  Components: Pandas API on Spark
>Affects Versions: 4.0.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
>
> Enable DataFrameTests.test_append for pandas 2.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43870) Enable SeriesTests for pandas 2.0.0.

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-43870.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 42268
[https://github.com/apache/spark/pull/42268]

> Enable SeriesTests for pandas 2.0.0.
> 
>
> Key: SPARK-43870
> URL: https://issues.apache.org/jira/browse/SPARK-43870
> Project: Spark
>  Issue Type: Sub-task
>  Components: Pandas API on Spark, PySpark
>Affects Versions: 4.0.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
> Fix For: 4.0.0
>
>
> test list:
>  * test_value_counts
>  * test_append
>  * test_astype
>  * test_between
>  * test_mad
>  * test_quantile
>  * test_rank
>  * test_between_time
>  * test_iteritems
>  * test_product
>  * test_factorize



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-43870) Enable SeriesTests for pandas 2.0.0.

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-43870:


Assignee: Haejoon Lee

> Enable SeriesTests for pandas 2.0.0.
> 
>
> Key: SPARK-43870
> URL: https://issues.apache.org/jira/browse/SPARK-43870
> Project: Spark
>  Issue Type: Sub-task
>  Components: Pandas API on Spark, PySpark
>Affects Versions: 4.0.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
>
> test list:
>  * test_value_counts
>  * test_append
>  * test_astype
>  * test_between
>  * test_mad
>  * test_quantile
>  * test_rank
>  * test_between_time
>  * test_iteritems
>  * test_product
>  * test_factorize



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43562) Enable DataFrameTests.test_append for pandas 2.0.0.

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-43562.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 42268
[https://github.com/apache/spark/pull/42268]

> Enable DataFrameTests.test_append for pandas 2.0.0.
> ---
>
> Key: SPARK-43562
> URL: https://issues.apache.org/jira/browse/SPARK-43562
> Project: Spark
>  Issue Type: Sub-task
>  Components: Pandas API on Spark
>Affects Versions: 4.0.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
> Fix For: 4.0.0
>
>
> Enable DataFrameTests.test_append for pandas 2.0.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43873) Enable DataFrameSlowTests for pandas 2.0.0.

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-43873.
--
Fix Version/s: 4.0.0
 Assignee: Haejoon Lee
   Resolution: Fixed

Fixed in https://github.com/apache/spark/pull/42319

> Enable DataFrameSlowTests for pandas 2.0.0.
> ---
>
> Key: SPARK-43873
> URL: https://issues.apache.org/jira/browse/SPARK-43873
> Project: Spark
>  Issue Type: Sub-task
>  Components: Pandas API on Spark, PySpark
>Affects Versions: 4.0.0
>Reporter: Haejoon Lee
>Assignee: Haejoon Lee
>Priority: Major
> Fix For: 4.0.0
>
>
> test list:
>  * test_describe
>  * test_between_time
>  * test_product
>  * test_iteritems
>  * test_mad
>  * test_cov
>  * test_quantile



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44640) Improve error messages for Python UDTF returning non iterable

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-44640.
--
Fix Version/s: 4.0.0
 Assignee: Allison Wang
   Resolution: Fixed

Fixed in https://github.com/apache/spark/pull/42302

> Improve error messages for Python UDTF returning non iterable
> -
>
> Key: SPARK-44640
> URL: https://issues.apache.org/jira/browse/SPARK-44640
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Allison Wang
>Assignee: Allison Wang
>Priority: Major
> Fix For: 4.0.0
>
>
> When the return type of a UDTF is not an iterable, the error message can be 
> confusing to users. For example for this UDTF:
> {code:java}
> @udtf(returnType="x: int")
> class TestUDTF:
> def eval(self, a):
> return a {code}
> Currently it fails with this error for regular UDTFs:
>     return tuple(map(verify_and_convert_result, res))
> TypeError: 'int' object is not iterable
> And this error for arrow-optimized UDTFs:
>     raise ValueError("DataFrame constructor not properly called!")
> ValueError: DataFrame constructor not properly called!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44548) Add support for pandas-on-Spark DataFrame assertDataFrameEqual

2023-08-03 Thread Amanda Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amanda Liu updated SPARK-44548:
---
Summary: Add support for pandas-on-Spark DataFrame assertDataFrameEqual  
(was: Add support for pandas DataFrame assertDataFrameEqual)

> Add support for pandas-on-Spark DataFrame assertDataFrameEqual
> --
>
> Key: SPARK-44548
> URL: https://issues.apache.org/jira/browse/SPARK-44548
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Amanda Liu
>Assignee: Amanda Liu
>Priority: Major
> Fix For: 3.5.0, 4.0.0
>
>
> SPIP: 
> https://docs.google.com/document/d/1OkyBn3JbEHkkQgSQ45Lq82esXjr9rm2Vj7Ih_4zycRc/edit#heading=h.f5f0u2riv07v



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44665) Add support for pandas DataFrame assertDataFrameEqual

2023-08-03 Thread Amanda Liu (Jira)
Amanda Liu created SPARK-44665:
--

 Summary: Add support for pandas DataFrame assertDataFrameEqual
 Key: SPARK-44665
 URL: https://issues.apache.org/jira/browse/SPARK-44665
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Amanda Liu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44664) Release the execute when closing the iterator in Python client

2023-08-03 Thread Hyukjin Kwon (Jira)
Hyukjin Kwon created SPARK-44664:


 Summary: Release the execute when closing the iterator in Python 
client
 Key: SPARK-44664
 URL: https://issues.apache.org/jira/browse/SPARK-44664
 Project: Spark
  Issue Type: Task
  Components: Connect, PySpark
Affects Versions: 3.5.0
Reporter: Hyukjin Kwon


See SPARK-44656 and SPARK-44642. We need the same in Python client



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44642) ExecutePlanResponseReattachableIterator should release all after error

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-44642:


Assignee: Juliusz Sompolski

> ExecutePlanResponseReattachableIterator should release all after error
> --
>
> Key: SPARK-44642
> URL: https://issues.apache.org/jira/browse/SPARK-44642
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Juliusz Sompolski
>Assignee: Juliusz Sompolski
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44642) ExecutePlanResponseReattachableIterator should release all after error

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-44642.
--
Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 42304
[https://github.com/apache/spark/pull/42304]

> ExecutePlanResponseReattachableIterator should release all after error
> --
>
> Key: SPARK-44642
> URL: https://issues.apache.org/jira/browse/SPARK-44642
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Juliusz Sompolski
>Assignee: Juliusz Sompolski
>Priority: Major
> Fix For: 3.5.0, 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44652) Raise error when only one df is None

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-44652.
--
Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 42314
[https://github.com/apache/spark/pull/42314]

> Raise error when only one df is None
> 
>
> Key: SPARK-44652
> URL: https://issues.apache.org/jira/browse/SPARK-44652
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Amanda Liu
>Assignee: Amanda Liu
>Priority: Major
> Fix For: 3.5.0, 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44652) Raise error when only one df is None

2023-08-03 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-44652:


Assignee: Amanda Liu

> Raise error when only one df is None
> 
>
> Key: SPARK-44652
> URL: https://issues.apache.org/jira/browse/SPARK-44652
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 3.5.0
>Reporter: Amanda Liu
>Assignee: Amanda Liu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).
h4. *Single Row Filteration*

5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful . Though code is being 
tested continuously and adding more tests , with big change, some possibility 
of bugs is there. But as of now, I think the code is robust. To get the Perf 
benefit fully, the pushed filters utilization needs to be implemented on the 
DataSource side too. Have already done it for {*}iceberg{*}. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit, even with Default 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).
h4. *Single Row Filteration*

5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful . Though code is being 
tested continuously and adding more tests , with big change, some possibility 
of bugs is there. But as of now, I think the code is robust. To get the Perf 
benefit fully, the pushed filters utilization needs to be implemented on the 
DataSource side too. Have already done it for {*}iceberg{*}. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.
h2. *Q7. How long 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

h4. {anchor:singleRowFilter}*Single Row Filteration*
5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

h4. {anchor:srf}*Single Row Filteration*
5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#srf] approach would still 
result in perf benefit.
h2. 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

h4. {anchor:singleRowfilter}*Single Row Filteration*
5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach 
would still result 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowfilter}
5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach 
would still result in perf benefit.
h2. *Q7. 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowFilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins,  [#singleRowFilter] approach 
would still result in perf benefit.

Q7. How 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowFilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins,  approach would still result in 
perf benefit.

Q7. How long will it take?

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful. Though code is being 
tested continuously and adding more tests , with big change, some possibility 
of bugs is there. But as of now, I think the code is robust. To get the Perf 
benefit fully, the pushed filters utilization needs to be implemented on the 
DataSource side too. Have already done it for {*}iceberg{*}. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach 
would still result in perf benefit.
h2. *Q7. How long will it take?*

The code is 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowfilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach 
would still result in perf benefit.
h2. *Q7. 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor: singleRowFilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.
h2. *Q7. 

[jira] [Created] (SPARK-44663) Disable arrow optimization by default

2023-08-03 Thread Allison Wang (Jira)
Allison Wang created SPARK-44663:


 Summary: Disable arrow optimization by default
 Key: SPARK-44663
 URL: https://issues.apache.org/jira/browse/SPARK-44663
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 3.5.0
Reporter: Allison Wang


Disable arrow optimization to make Python UDTFs consistent with Python UDFs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44661) getMapOutputLocation should not throw NPE

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-44661:
--
Priority: Minor  (was: Major)

> getMapOutputLocation should not throw NPE
> -
>
> Key: SPARK-44661
> URL: https://issues.apache.org/jira/browse/SPARK-44661
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core, Tests
>Affects Versions: 3.4.1, 3.5.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Minor
> Fix For: 3.4.2, 3.5.0, 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowFilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.
h2. *Q7. 

[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{:singleRowFilter}
5) In case of nested broadcasted joins, if the datasource is column vector 
oriented , then what spark would get is a ColumnarBatch. But because scans have 
Filters from multiple joins, they can be retrieved and can be applied in code 
generated at ColumnToRowExec level, using a new "containsKey" method on 
HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins 
( whose keys have been pushed) , will be used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.

h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.

h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.

h2. *Q7. How 

[jira] [Resolved] (SPARK-44661) getMapOutputLocation should not throw NPE

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-44661.
---
Fix Version/s: 3.5.0
   4.0.0
   3.4.2
   Resolution: Fixed

Issue resolved by pull request 42326
[https://github.com/apache/spark/pull/42326]

> getMapOutputLocation should not throw NPE
> -
>
> Key: SPARK-44661
> URL: https://issues.apache.org/jira/browse/SPARK-44661
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core, Tests
>Affects Versions: 3.4.1, 3.5.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.5.0, 4.0.0, 3.4.2
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-44662:
-
Description: 
h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is 
column vector oriented , then what spark would get is a ColumnarBatch. But 
because scans have Filters from multiple joins, they can be retrieved and can 
be applied in code generated at ColumnToRowExec level, using a new 
"containsKey" method on HashedRelation. Thus only those rows which satisfy all 
the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join 
evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.

h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.

h2. *Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit fully, the pushed filters utilization needs to be implemented 
on the DataSource side too. Have already done it for *iceberg*. But I believe 
atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach 
would still result in perf benefit.

h2. *Q7. How 

[jira] [Assigned] (SPARK-44661) getMapOutputLocation should not throw NPE

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-44661:
-

Assignee: Dongjoon Hyun

> getMapOutputLocation should not throw NPE
> -
>
> Key: SPARK-44661
> URL: https://issues.apache.org/jira/browse/SPARK-44661
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core, Tests
>Affects Versions: 3.4.1, 3.5.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44646) Migrate Log4j 2.x in Spark 3.4.1 to Logback

2023-08-03 Thread L. C. Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750895#comment-17750895
 ] 

L. C. Hsieh commented on SPARK-44646:
-

I have not used it but maybe you can try 
https://logging.apache.org/log4j/log4j-2.2/log4j-to-slf4j/index.html.

> Migrate Log4j 2.x in Spark 3.4.1 to Logback
> ---
>
> Key: SPARK-44646
> URL: https://issues.apache.org/jira/browse/SPARK-44646
> Project: Spark
>  Issue Type: Brainstorming
>  Components: Build
>Affects Versions: 3.4.1
>Reporter: Yu Tian
>Priority: Major
>
> Hi,
> We are working on the spark 3.4.1 upgrade from spark 3.1.3, in our logging 
> system, we are using logback framework, it is working with spark 3.1.3 since 
> it is using log4j 1.x. However, when we upgrade spark to 3.4.1, based on the 
> [release 
> notes|https://spark.apache.org/docs/latest/core-migration-guide.html], spark 
> is migrating from log4j 2.x from log4j 1.x, the way we are replacing the 
> log4j with logback is causing build failures in spark master start process.
> Error: Unable to initialize main class org.apache.spark.deploy.master.Master
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/logging/log4j/core/Filter
> In our current approach, we are using log4j-over-slf4j to replace the 
> log4j-core, it is only applicable to log4j 1.x library. And there is no 
> log4j-over-slf4j for log4j 2.x out there yet. (please correct me if I am 
> wrong). 
> I am also curious that why spark choose to use log4j 2.x instead of using 
> SPI, which gives the users less flexibility to choose whatever logger 
> implementation they want to use.
> I want to share this issue and see if anyone else has been reported this and 
> if there is any work-around or alternative solutions for it. Any suggestions 
> are appreciated, thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns

2023-08-03 Thread Asif (Jira)
Asif created SPARK-44662:


 Summary: SPIP: Improving performance of BroadcastHashJoin queries 
with stream side join key on non partition columns
 Key: SPARK-44662
 URL: https://issues.apache.org/jira/browse/SPARK-44662
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.3
Reporter: Asif
 Fix For: 3.3.3


h2. *Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.*

On the lines of DPP which helps DataSourceV2 relations when the joining key is 
a partition column, the same concept can be extended over to the case where 
joining key is not a partition column.
The Keys of BroadcastHashJoin are already available before actual evaluation of 
the stream iterator. These keys can be pushed down to the DataSource as a 
SortedSet.
For non partition columns, the DataSources like iceberg have max/min stats on 
column available at manifest level, and for formats like parquet , they have 
max/min stats at various storage level. The passed SortedSet can be used to 
prune using ranges at both driver level ( manifests files) as well as executor 
level ( while actually going through chunks , row groups etc at parquet level)

If the data is stored as Columnar Batch format , then it would not be possible 
to filter out individual row at DataSource level, even though we have keys.
But at the scan level, ( ColumnToRowExec) it is still possible to filter out as 
many rows as possible , if the query involves nested joins. Thus reducing the 
number of rows to join at the higher join levels.

Will be adding more details..
h2. *Q2. What problem is this proposal NOT designed to solve?*

This can only help in BroadcastHashJoin's performance if the join is Inner or 
Left Semi.
This will also not work if there are nodes like Expand, Generator , Aggregate 
(without group by on keys not part of joining column etc) below the 
BroadcastHashJoin node being targeted.
h2. *Q3. How is it done today, and what are the limits of current practice?*

Currently this sort of pruning at DataSource level is being done using DPP 
(Dynamic Partition Pruning ) and IFF one of the join key column is a 
Partitioning column ( so that cost of DPP query is justified and way less than 
amount of data it will be filtering by skipping partitions).
The limitation is that DPP type approach is not implemented ( intentionally I 
believe), if the join column is a non partition column ( because of cost of 
"DPP type" query would most likely be way high as compared to any possible 
pruning ( especially if the column is not stored in a sorted manner).
h2. *Q4. What is new in your approach and why do you think it will be 
successful?*

1) This allows pruning on non partition column based joins. 
2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP 
type" query. 
3) The Data can be used by DataSource to prune at driver (possibly) and also at 
executor level ( as in case of parquet which has max/min at various structure 
levels)

4) The big benefit should be seen in multilevel nested join queries. In the 
current code base, if I am correct, only one join's pruning filter would get 
pushed at scan level. Since it is on partition key may be that is sufficient. 
But if it is a nested Join query , and may be involving different columns on 
streaming side for join, each such filter push could do significant pruning. 
This requires some handling in case of AQE, as the stream side iterator ( & 
hence stage evaluation needs to be delayed, till all the available join filters 
in the nested tree are pushed at their respective target BatchScanExec).

{anchor:singleRowFilter}5) In case of nested broadcasted joins, if the 
datasource is column vector oriented , then what spark would get is a 
ColumnarBatch. But because scans have Filters from multiple joins, they can be 
retrieved and can be applied in code generated at ColumnToRowExec level, using 
a new "containsKey" method on HashedRelation. Thus only those rows which 
satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be 
used for join evaluation.

The code is already there , will be opening a PR. For non partition table TPCDS 
run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain.

For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 
10% to 37%.
h2. *Q5. Who cares? If you are successful, what difference will it make?*

If use cases involve multiple joins especially when the join columns are non 
partitioned, and performance is a criteria, this PR *might* help.
h2. Q6. What are the risks?*

Well the changes are extensive. review will be painful ( if it happens). Though 
code is being tested continuously and adding more tests , with big change, some 
possibility of bugs is there. But as of now, I think the code is robust. To get 
the Perf benefit 

[jira] [Commented] (SPARK-44646) Migrate Log4j 2.x in Spark 3.4.1 to Logback

2023-08-03 Thread Yu Tian (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750891#comment-17750891
 ] 

Yu Tian commented on SPARK-44646:
-

Hi [~viirya] 

Could you please check this thread? It is a question related to 
https://issues.apache.org/jira/browse/SPARK-37814

Thanks.

> Migrate Log4j 2.x in Spark 3.4.1 to Logback
> ---
>
> Key: SPARK-44646
> URL: https://issues.apache.org/jira/browse/SPARK-44646
> Project: Spark
>  Issue Type: Brainstorming
>  Components: Build
>Affects Versions: 3.4.1
>Reporter: Yu Tian
>Priority: Major
>
> Hi,
> We are working on the spark 3.4.1 upgrade from spark 3.1.3, in our logging 
> system, we are using logback framework, it is working with spark 3.1.3 since 
> it is using log4j 1.x. However, when we upgrade spark to 3.4.1, based on the 
> [release 
> notes|https://spark.apache.org/docs/latest/core-migration-guide.html], spark 
> is migrating from log4j 2.x from log4j 1.x, the way we are replacing the 
> log4j with logback is causing build failures in spark master start process.
> Error: Unable to initialize main class org.apache.spark.deploy.master.Master
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/logging/log4j/core/Filter
> In our current approach, we are using log4j-over-slf4j to replace the 
> log4j-core, it is only applicable to log4j 1.x library. And there is no 
> log4j-over-slf4j for log4j 2.x out there yet. (please correct me if I am 
> wrong). 
> I am also curious that why spark choose to use log4j 2.x instead of using 
> SPI, which gives the users less flexibility to choose whatever logger 
> implementation they want to use.
> I want to share this issue and see if anyone else has been reported this and 
> if there is any work-around or alternative solutions for it. Any suggestions 
> are appreciated, thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44658) ShuffleStatus.getMapStatus should return None instead of Some(null)

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-44658:
-

Assignee: Dongjoon Hyun

> ShuffleStatus.getMapStatus should return None instead of Some(null)
> ---
>
> Key: SPARK-44658
> URL: https://issues.apache.org/jira/browse/SPARK-44658
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44658) ShuffleStatus.getMapStatus should return None instead of Some(null)

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-44658.
---
Fix Version/s: 3.5.0
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 42323
[https://github.com/apache/spark/pull/42323]

> ShuffleStatus.getMapStatus should return None instead of Some(null)
> ---
>
> Key: SPARK-44658
> URL: https://issues.apache.org/jira/browse/SPARK-44658
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Critical
> Fix For: 3.5.0, 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44660) Relax constraint for columnar shuffle check in AQE

2023-08-03 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750881#comment-17750881
 ] 

Chao Sun commented on SPARK-44660:
--

In fact the check is necessary, but it seems 
{code}
postStageCreationRules(outputsColumnar = plan.supportsColumnar)
{code}

can be relaxed: if the new shuffle operator supports columnar, then maybe we 
shouldn't insert {{ColumnarToRow}} to this stage. This is assuming the 
following stage knows the shuffle output is columnar and has corresponding 
{{ColumnarToRow}} if necessary.

> Relax constraint for columnar shuffle check in AQE
> --
>
> Key: SPARK-44660
> URL: https://issues.apache.org/jira/browse/SPARK-44660
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Chao Sun
>Priority: Major
>
> Currently in AQE, after evaluating the columnar rules, Spark will check if 
> the top operator of the stage is still a shuffle operator, and throw 
> exception if it doesn't.
> {code}
> val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, 
> isFinalStage = false)))
> val newPlan = applyPhysicalRules(
>   optimized,
>   postStageCreationRules(outputsColumnar = plan.supportsColumnar),
>   Some((planChangeLogger, "AQE Post Stage Creation")))
> if (e.isInstanceOf[ShuffleExchangeLike]) {
>   if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
> throw SparkException.internalError(
>   "Custom columnar rules cannot transform shuffle node to 
> something else.")
>   }
> {code}
> However, once a shuffle operator is transformed into a custom columnar 
> shuffle operator, the {{supportsColumnar}} of the new shuffle operator will 
> return true, and therefore the columnar rules will insert {{ColumnarToRow}} 
> on top of it. This means the {{newPlan}} is likely no longer a 
> {{ShuffleExchangeLike}} but a {{ColumnarToRow}}, and exception will be 
> thrown, even though the use case is valid.
> This JIRA proposes to relax the check by allowing the above case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-44641:
--
Affects Version/s: 3.4.0

> SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but 
> conditions unmet
> --
>
> Key: SPARK-44641
> URL: https://issues.apache.org/jira/browse/SPARK-44641
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.0, 3.4.1
>Reporter: Szehon Ho
>Priority: Blocker
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
> problem.
>  
> {code:java}
> test("test join key is the second partition key and a transform") {
>   val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
>   createTable(items, items_schema, items_partitions)
>   sql(s"INSERT INTO testcat.ns.$items VALUES " +
> s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
> s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
> s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>   val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
>   createTable(purchases, purchases_schema, purchases_partitions)
>   sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
> s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
> s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
> s"(3, 19.5, cast('2020-02-01' as timestamp))")
>   withSQLConf(
> SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
> SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
>   "true") {
> val df = sql("SELECT id, name, i.price as purchase_price, " +
>   "p.item_id, p.price as sale_price " +
>   s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
>   "ON i.arrive_time = p.time " +
>   "ORDER BY id, purchase_price, p.item_id, sale_price")
> val shuffles = collectShuffles(df.queryExecution.executedPlan)
> assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys 
> are partition keys")
> checkAnswer(df,
>   Seq(
> Row(1, "aa", 40.0, 1, 42.0),
> Row(1, "aa", 40.0, 2, 11.0),
> Row(1, "aa", 41.0, 1, 44.0),
> Row(1, "aa", 41.0, 1, 45.0),
> Row(2, "bb", 10.0, 1, 42.0),
> Row(2, "bb", 10.0, 2, 11.0),
> Row(2, "bb", 10.5, 1, 42.0),
> Row(2, "bb", 10.5, 2, 11.0),
> Row(3, "cc", 15.5, 3, 19.5)
>   )
> )
>   }
> }{code}
>  
> Note: this tests has setup the datasourceV2 to return multiple splits for 
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition 
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate 
> through non-grouped partition and lookup from grouped partition to fill the 
> map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-03 Thread Chao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chao Sun updated SPARK-44641:
-
Priority: Blocker  (was: Major)

> SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but 
> conditions unmet
> --
>
> Key: SPARK-44641
> URL: https://issues.apache.org/jira/browse/SPARK-44641
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Szehon Ho
>Priority: Blocker
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
> problem.
>  
> {code:java}
> test("test join key is the second partition key and a transform") {
>   val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
>   createTable(items, items_schema, items_partitions)
>   sql(s"INSERT INTO testcat.ns.$items VALUES " +
> s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
> s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
> s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>   val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
>   createTable(purchases, purchases_schema, purchases_partitions)
>   sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
> s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
> s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
> s"(3, 19.5, cast('2020-02-01' as timestamp))")
>   withSQLConf(
> SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
> SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
>   "true") {
> val df = sql("SELECT id, name, i.price as purchase_price, " +
>   "p.item_id, p.price as sale_price " +
>   s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
>   "ON i.arrive_time = p.time " +
>   "ORDER BY id, purchase_price, p.item_id, sale_price")
> val shuffles = collectShuffles(df.queryExecution.executedPlan)
> assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys 
> are partition keys")
> checkAnswer(df,
>   Seq(
> Row(1, "aa", 40.0, 1, 42.0),
> Row(1, "aa", 40.0, 2, 11.0),
> Row(1, "aa", 41.0, 1, 44.0),
> Row(1, "aa", 41.0, 1, 45.0),
> Row(2, "bb", 10.0, 1, 42.0),
> Row(2, "bb", 10.0, 2, 11.0),
> Row(2, "bb", 10.5, 1, 42.0),
> Row(2, "bb", 10.5, 2, 11.0),
> Row(3, "cc", 15.5, 3, 19.5)
>   )
> )
>   }
> }{code}
>  
> Note: this tests has setup the datasourceV2 to return multiple splits for 
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition 
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate 
> through non-grouped partition and lookup from grouped partition to fill the 
> map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44661) getMapOutputLocation should not throw NPE

2023-08-03 Thread Dongjoon Hyun (Jira)
Dongjoon Hyun created SPARK-44661:
-

 Summary: getMapOutputLocation should not throw NPE
 Key: SPARK-44661
 URL: https://issues.apache.org/jira/browse/SPARK-44661
 Project: Spark
  Issue Type: Test
  Components: Spark Core, Tests
Affects Versions: 3.4.1, 3.5.0
Reporter: Dongjoon Hyun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44660) Relax constraint for columnar shuffle check in AQE

2023-08-03 Thread Chao Sun (Jira)
Chao Sun created SPARK-44660:


 Summary: Relax constraint for columnar shuffle check in AQE
 Key: SPARK-44660
 URL: https://issues.apache.org/jira/browse/SPARK-44660
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.1
Reporter: Chao Sun


Currently in AQE, after evaluating the columnar rules, Spark will check if the 
top operator of the stage is still a shuffle operator, and throw exception if 
it doesn't.

{code}
val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, 
isFinalStage = false)))
val newPlan = applyPhysicalRules(
  optimized,
  postStageCreationRules(outputsColumnar = plan.supportsColumnar),
  Some((planChangeLogger, "AQE Post Stage Creation")))
if (e.isInstanceOf[ShuffleExchangeLike]) {
  if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
throw SparkException.internalError(
  "Custom columnar rules cannot transform shuffle node to something 
else.")
  }
{code}

However, once a shuffle operator is transformed into a custom columnar shuffle 
operator, the {{supportsColumnar}} of the new shuffle operator will return 
true, and therefore the columnar rules will insert {{ColumnarToRow}} on top of 
it. This means the {{newPlan}} is likely no longer a {{ShuffleExchangeLike}} 
but a {{ColumnarToRow}}, and exception will be thrown, even though the use case 
is valid.

This JIRA proposes to relax the check by allowing the above case.







--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44658) ShuffleStatus.getMapStatus should return None instead of Some(null)

2023-08-03 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-44658:
--
Summary: ShuffleStatus.getMapStatus should return None instead of 
Some(null)  (was: ShuffleStatus.getMapStatus should return None)

> ShuffleStatus.getMapStatus should return None instead of Some(null)
> ---
>
> Key: SPARK-44658
> URL: https://issues.apache.org/jira/browse/SPARK-44658
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.5.0
>Reporter: Dongjoon Hyun
>Priority: Critical
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44659) SPJ: Include keyGroupedPartitioning in StoragePartitionJoinParams equality check

2023-08-03 Thread Chao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chao Sun updated SPARK-44659:
-
Summary: SPJ: Include keyGroupedPartitioning in StoragePartitionJoinParams 
equality check  (was: Include keyGroupedPartitioning in 
StoragePartitionJoinParams equality check)

> SPJ: Include keyGroupedPartitioning in StoragePartitionJoinParams equality 
> check
> 
>
> Key: SPARK-44659
> URL: https://issues.apache.org/jira/browse/SPARK-44659
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Chao Sun
>Priority: Minor
>
> Currently {{StoragePartitionJoinParams}} doesn't include 
> {{keyGroupedPartitioning}} in its {{equals}} and {{hashCode}} computation. 
> For completeness, we should include it as well since it is a member of the 
> class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-03 Thread Chao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chao Sun updated SPARK-44641:
-
Summary: SPJ: Results duplicated when SPJ partial-cluster and pushdown 
enabled but conditions unmet  (was: Results duplicated when SPJ partial-cluster 
and pushdown enabled but conditions unmet)

> SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but 
> conditions unmet
> --
>
> Key: SPARK-44641
> URL: https://issues.apache.org/jira/browse/SPARK-44641
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Szehon Ho
>Priority: Major
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
> problem.
>  
> {code:java}
> test("test join key is the second partition key and a transform") {
>   val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
>   createTable(items, items_schema, items_partitions)
>   sql(s"INSERT INTO testcat.ns.$items VALUES " +
> s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
> s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
> s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>   val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
>   createTable(purchases, purchases_schema, purchases_partitions)
>   sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
> s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
> s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
> s"(3, 19.5, cast('2020-02-01' as timestamp))")
>   withSQLConf(
> SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
> SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
>   "true") {
> val df = sql("SELECT id, name, i.price as purchase_price, " +
>   "p.item_id, p.price as sale_price " +
>   s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
>   "ON i.arrive_time = p.time " +
>   "ORDER BY id, purchase_price, p.item_id, sale_price")
> val shuffles = collectShuffles(df.queryExecution.executedPlan)
> assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys 
> are partition keys")
> checkAnswer(df,
>   Seq(
> Row(1, "aa", 40.0, 1, 42.0),
> Row(1, "aa", 40.0, 2, 11.0),
> Row(1, "aa", 41.0, 1, 44.0),
> Row(1, "aa", 41.0, 1, 45.0),
> Row(2, "bb", 10.0, 1, 42.0),
> Row(2, "bb", 10.0, 2, 11.0),
> Row(2, "bb", 10.5, 1, 42.0),
> Row(2, "bb", 10.5, 2, 11.0),
> Row(3, "cc", 15.5, 3, 19.5)
>   )
> )
>   }
> }{code}
>  
> Note: this tests has setup the datasourceV2 to return multiple splits for 
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition 
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate 
> through non-grouped partition and lookup from grouped partition to fill the 
> map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44659) Include keyGroupedPartitioning in StoragePartitionJoinParams equality check

2023-08-03 Thread Chao Sun (Jira)
Chao Sun created SPARK-44659:


 Summary: Include keyGroupedPartitioning in 
StoragePartitionJoinParams equality check
 Key: SPARK-44659
 URL: https://issues.apache.org/jira/browse/SPARK-44659
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.5.0
Reporter: Chao Sun


Currently {{StoragePartitionJoinParams}} doesn't include 
{{keyGroupedPartitioning}} in its {{equals}} and {{hashCode}} computation. For 
completeness, we should include it as well since it is a member of the class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44641) Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet

2023-08-03 Thread Chao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chao Sun updated SPARK-44641:
-
Parent: SPARK-37375
Issue Type: Sub-task  (was: Bug)

> Results duplicated when SPJ partial-cluster and pushdown enabled but 
> conditions unmet
> -
>
> Key: SPARK-44641
> URL: https://issues.apache.org/jira/browse/SPARK-44641
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.4.1
>Reporter: Szehon Ho
>Priority: Major
>
> Adding the following test case in KeyGroupedPartitionSuite demonstrates the 
> problem.
>  
> {code:java}
> test("test join key is the second partition key and a transform") {
>   val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
>   createTable(items, items_schema, items_partitions)
>   sql(s"INSERT INTO testcat.ns.$items VALUES " +
> s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
> s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
> s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
>   val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
>   createTable(purchases, purchases_schema, purchases_partitions)
>   sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
> s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
> s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
> s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
> s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
> s"(3, 19.5, cast('2020-02-01' as timestamp))")
>   withSQLConf(
> SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
> SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
> SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
>   "true") {
> val df = sql("SELECT id, name, i.price as purchase_price, " +
>   "p.item_id, p.price as sale_price " +
>   s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " +
>   "ON i.arrive_time = p.time " +
>   "ORDER BY id, purchase_price, p.item_id, sale_price")
> val shuffles = collectShuffles(df.queryExecution.executedPlan)
> assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys 
> are partition keys")
> checkAnswer(df,
>   Seq(
> Row(1, "aa", 40.0, 1, 42.0),
> Row(1, "aa", 40.0, 2, 11.0),
> Row(1, "aa", 41.0, 1, 44.0),
> Row(1, "aa", 41.0, 1, 45.0),
> Row(2, "bb", 10.0, 1, 42.0),
> Row(2, "bb", 10.0, 2, 11.0),
> Row(2, "bb", 10.5, 1, 42.0),
> Row(2, "bb", 10.5, 2, 11.0),
> Row(3, "cc", 15.5, 3, 19.5)
>   )
> )
>   }
> }{code}
>  
> Note: this tests has setup the datasourceV2 to return multiple splits for 
> same partition.
> In this case, SPJ is not triggered (because join key does not match partition 
> key), but the following code in DSV2Scan:
> [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194]
> intended to fill the empty partition for 'pushdown-vallue' will still iterate 
> through non-grouped partition and lookup from grouped partition to fill the 
> map, resulting in some duplicate input data fed into the join.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44658) ShuffleStatus.getMapStatus should return None

2023-08-03 Thread Dongjoon Hyun (Jira)
Dongjoon Hyun created SPARK-44658:
-

 Summary: ShuffleStatus.getMapStatus should return None
 Key: SPARK-44658
 URL: https://issues.apache.org/jira/browse/SPARK-44658
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.5.0
Reporter: Dongjoon Hyun






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43496) Have a separate config for Memory limits for kubernetes pods

2023-08-03 Thread Laurenceau Julien (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750791#comment-17750791
 ] 

Laurenceau Julien commented on SPARK-43496:
---

Hi,

I would like to suggest to go beyond that :

As is spark on kubernetes always sets the following attributes to executors and 
driver pods:
{code:java}
requests:
  cpu: "1"
  memory: "1234M"
limits:
  memory: "1234M"                       {code}
with the values coming from the _[driver|executor]-memory_ and 
_[driver|executor]-cores_ options.

This is good, except for resource utilization !

On most dev environment it's would be ok no to set any resource requests and 
limits. One could just rely on usual JVM options.

Besides it would be nice if these settings could be user defined at different 
values than JVM values so that we would be able to manage overcommitment 
through it.

Example:
 * options : _[driver|executor]-memory_ and _[driver|executor]-cores_ are used 
to set JVM options.
If no custom definition it also sets pods requests & limits to stay consistent
 * new options:
_spark.kubernetes.driver.requests.cpu_ 
_spark.kubernetes.driver.requests.memory_
_spark.kubernetes.driver.limits.cpu_ 
_spark.kubernetes.driver.limits.memory_
_spark.kubernetes.executor.requests.cpu_ 
_spark.kubernetes.executor.requests.memory_
_spark.kubernetes.executor.limits.cpu_ 
_spark.kubernetes.executor.limits.memory_
if unset then stay consistent with current behavior
if set to 0 then disable this definition

This would also solve the issue that driver/executor core is defined as an 
Integer and cannot be 0.5 for a driver.

 

> Have a separate config for Memory limits for kubernetes pods
> 
>
> Key: SPARK-43496
> URL: https://issues.apache.org/jira/browse/SPARK-43496
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 3.4.0
>Reporter: Alexander Yerenkow
>Priority: Major
>
> Whole allocated memory to JVM is set into pod resources as both request and 
> limits.
> This means there's not a way to use more memory for burst-like jobs in a 
> shared environment.
> For example, if spark job uses external process (outside of JVM) to access 
> data, a bit of extra memory required for that, and having configured higher 
> limits for mem could be of use.
> Another thought here - have a way to configure different JVM/ pod memory 
> request also could be a valid use case.
>  
> Github PR: [https://github.com/apache/spark/pull/41067]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44654) In subquery cannot perform partition pruning

2023-08-03 Thread 7mming7 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750743#comment-17750743
 ] 

7mming7 commented on SPARK-44654:
-

[~yumwang] This is also possible, but if it is the case of multiple records?

> In subquery cannot perform partition pruning
> 
>
> Key: SPARK-44654
> URL: https://issues.apache.org/jira/browse/SPARK-44654
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: 7mming7
>Priority: Minor
>  Labels: performance
> Attachments: image-2023-08-03-17-22-53-981.png
>
>
> The following SQL cannot perform partition pruning
> {code:java}
> SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
> parquet_part){code}
> As can be seen from the execution plan below, the partition pruning of left 
> cannot be performed after the subquery of in is converted into join
> !image-2023-08-03-17-22-53-981.png!
> The current issue proposes to optimize insubquery. Only when the value of in 
> is greater than a threshold, insubquery will be converted into Join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44654) In subquery cannot perform partition pruning

2023-08-03 Thread Yuming Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750739#comment-17750739
 ] 

Yuming Wang commented on SPARK-44654:
-

Another way is convert join to filter if maximum number of rows on one side is 
1: https://github.com/apache/spark/pull/42114

> In subquery cannot perform partition pruning
> 
>
> Key: SPARK-44654
> URL: https://issues.apache.org/jira/browse/SPARK-44654
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: 7mming7
>Priority: Minor
>  Labels: performance
> Attachments: image-2023-08-03-17-22-53-981.png
>
>
> The following SQL cannot perform partition pruning
> {code:java}
> SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
> parquet_part){code}
> As can be seen from the execution plan below, the partition pruning of left 
> cannot be performed after the subquery of in is converted into join
> !image-2023-08-03-17-22-53-981.png!
> The current issue proposes to optimize insubquery. Only when the value of in 
> is greater than a threshold, insubquery will be converted into Join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44657) Incorrect limit handling and config parsing in Arrow collect

2023-08-03 Thread Venkata Sai Akhil Gudesa (Jira)
Venkata Sai Akhil Gudesa created SPARK-44657:


 Summary: Incorrect limit handling and config parsing in Arrow 
collect
 Key: SPARK-44657
 URL: https://issues.apache.org/jira/browse/SPARK-44657
 Project: Spark
  Issue Type: Bug
  Components: Connect
Affects Versions: 3.4.1, 3.4.0, 3.4.2, 3.5.0
Reporter: Venkata Sai Akhil Gudesa


In the arrow writer 
[code|https://github.com/apache/spark/blob/6161bf44f40f8146ea4c115c788fd4eaeb128769/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala#L154-L163]
 , the conditions don’t seem to hold what the documentation says regd 
"{_}maxBatchSize and maxRecordsPerBatch, respect whatever smaller"{_} since it 
seems to actually respect the conf which is "larger" (i.e less restrictive) due 
to _||_ operator.

 

Further, when the `{_}CONNECT_GRPC_ARROW_MAX_BATCH_SIZE{_}` conf is read, the 
value is not converted to bytes from Mib 
([example|https://github.com/apache/spark/blob/3e5203c64c06cc8a8560dfa0fb6f52e74589b583/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L103]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44656) Close dangling iterators in SparkResult too (Spark Connect Scala)

2023-08-03 Thread Juliusz Sompolski (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juliusz Sompolski updated SPARK-44656:
--
Epic Link: SPARK-43754

> Close dangling iterators in SparkResult too (Spark Connect Scala)
> -
>
> Key: SPARK-44656
> URL: https://issues.apache.org/jira/browse/SPARK-44656
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect
>Affects Versions: 3.5.0
>Reporter: Alice Sayutina
>Priority: Major
>
> SPARK-44636 followup. We didn't address iterators grabbed in SparkResult 
> there.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44656) Close dangling iterators in SparkResult too (Spark Connect Scala)

2023-08-03 Thread Alice Sayutina (Jira)
Alice Sayutina created SPARK-44656:
--

 Summary: Close dangling iterators in SparkResult too (Spark 
Connect Scala)
 Key: SPARK-44656
 URL: https://issues.apache.org/jira/browse/SPARK-44656
 Project: Spark
  Issue Type: Improvement
  Components: Connect
Affects Versions: 3.5.0
Reporter: Alice Sayutina


SPARK-44636 followup. We didn't address iterators grabbed in SparkResult there.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44619) Free up disk space for container jobs

2023-08-03 Thread Ruifeng Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruifeng Zheng updated SPARK-44619:
--
Summary: Free up disk space for container jobs  (was: Free up disk space 
for pyspark container jobs)

> Free up disk space for container jobs
> -
>
> Key: SPARK-44619
> URL: https://issues.apache.org/jira/browse/SPARK-44619
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44655) make the code cleaner about static and dynamc data/partition filters

2023-08-03 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-44655:
---

 Summary: make the code cleaner about static and dynamc 
data/partition filters
 Key: SPARK-44655
 URL: https://issues.apache.org/jira/browse/SPARK-44655
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Wenchen Fan
Assignee: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44654) In subquery cannot perform partition pruning

2023-08-03 Thread 7mming7 (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

7mming7 updated SPARK-44654:

Description: 
The following SQL cannot perform partition pruning
{code:java}
SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
parquet_part){code}
As can be seen from the execution plan below, the partition pruning of left 
cannot be performed after the subquery of in is converted into join

!image-2023-08-03-17-22-53-981.png!

The current issue proposes to optimize insubquery. Only when the value of in is 
greater than a threshold, insubquery will be converted into Join

  was:
The following SQL cannot perform partition pruning
{code:java}
SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
parquet_part){code}
As can be seen from the execution plan below, the partition pruning of left 
cannot be performed after the subquery of in is converted into join

!image-2023-08-03-17-17-34-858.png!

The current issue proposes to optimize insubquery. Only when the value of in is 
greater than a threshold, insubquery will be converted into Join


> In subquery cannot perform partition pruning
> 
>
> Key: SPARK-44654
> URL: https://issues.apache.org/jira/browse/SPARK-44654
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: 7mming7
>Priority: Minor
>  Labels: performance
> Attachments: image-2023-08-03-17-22-53-981.png
>
>
> The following SQL cannot perform partition pruning
> {code:java}
> SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
> parquet_part){code}
> As can be seen from the execution plan below, the partition pruning of left 
> cannot be performed after the subquery of in is converted into join
> !image-2023-08-03-17-22-53-981.png!
> The current issue proposes to optimize insubquery. Only when the value of in 
> is greater than a threshold, insubquery will be converted into Join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40927) Memory issue with Structured streaming

2023-08-03 Thread Iain Morrison (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750660#comment-17750660
 ] 

Iain Morrison commented on SPARK-40927:
---

In our case I found the following settings greatly improved our streaming 
applications, currently running for over 2 weeks without OOM killed (previously 
lasted a day or two)

1. Use RocksDB state store provider improved executor memory usage

        "spark.sql.streaming.stateStore.providerClass" -> 
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"

        Not sure if there is a leak in the default HDFS state store 
implementation or not.

2. Store UI on disk instead of in memory in the driver

        "spark.ui.store.path" -> "some path"

 

Old issue but I hope this helps someone

> Memory issue with Structured streaming
> --
>
> Key: SPARK-40927
> URL: https://issues.apache.org/jira/browse/SPARK-40927
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.3.0, 3.2.2
>Reporter: Mihir Kelkar
>Priority: Major
>
> In Pyspark Structured streaming with Kafka as source and sink, the driver as 
> well as the executors seem to get OOM killed after a long period of time (few 
> days). Not able to pinpoint to any specific thing.
> But 8-12 hrs long runs also show the slow memory creep in Prometheus metrics 
> values -
>  # JVM Off-heap memory of both driver and executors keep on increasing over 
> time (12-24hrs observation time) [I have NOT enabled off-heap usage]
>  # JVM heap memory of executors also keeps on bumping up in slow steps.
>  # JVM RSS of executors and driver keeps increasing but python RSS does not 
> increase
> -Basic operation of counting rows from within sdf.forEachBatch() is being 
> done to debug ( -Original business logic has Some dropDuplicates, 
> aggregations , windowing are being done within the forEachBatch.
> -watermarking on a custom timestamp column is being done. 
>  
> Heap Dump analysis shows large no. of duplicate strings (which look like 
> generated code). Further large no. of byte[], char[] and UTF8String objects.. 
> Does this point to any potential memory leak in Tungsten optimizer related 
> code?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44654) In subquery cannot perform partition pruning

2023-08-03 Thread 7mming7 (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

7mming7 updated SPARK-44654:

Attachment: image-2023-08-03-17-22-53-981.png

> In subquery cannot perform partition pruning
> 
>
> Key: SPARK-44654
> URL: https://issues.apache.org/jira/browse/SPARK-44654
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: 7mming7
>Priority: Minor
>  Labels: performance
> Attachments: image-2023-08-03-17-22-53-981.png
>
>
> The following SQL cannot perform partition pruning
> {code:java}
> SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
> parquet_part){code}
> As can be seen from the execution plan below, the partition pruning of left 
> cannot be performed after the subquery of in is converted into join
> !image-2023-08-03-17-17-34-858.png!
> The current issue proposes to optimize insubquery. Only when the value of in 
> is greater than a threshold, insubquery will be converted into Join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44654) In subquery cannot perform partition pruning

2023-08-03 Thread 7mming7 (Jira)
7mming7 created SPARK-44654:
---

 Summary: In subquery cannot perform partition pruning
 Key: SPARK-44654
 URL: https://issues.apache.org/jira/browse/SPARK-44654
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: 7mming7


The following SQL cannot perform partition pruning
{code:java}
SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from 
parquet_part){code}
As can be seen from the execution plan below, the partition pruning of left 
cannot be performed after the subquery of in is converted into join

!image-2023-08-03-17-17-34-858.png!

The current issue proposes to optimize insubquery. Only when the value of in is 
greater than a threshold, insubquery will be converted into Join



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44575) Implement Error Translation

2023-08-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750659#comment-17750659
 ] 

ASF GitHub Bot commented on SPARK-44575:


User 'heyihong' has created a pull request for this issue:
https://github.com/apache/spark/pull/42266

> Implement Error Translation
> ---
>
> Key: SPARK-44575
> URL: https://issues.apache.org/jira/browse/SPARK-44575
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect
>Affects Versions: 3.5.0
>Reporter: Yihong He
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44619) Free up disk space for pyspark container jobs

2023-08-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750657#comment-17750657
 ] 

ASF GitHub Bot commented on SPARK-44619:


User 'zhengruifeng' has created a pull request for this issue:
https://github.com/apache/spark/pull/42253

> Free up disk space for pyspark container jobs
> -
>
> Key: SPARK-44619
> URL: https://issues.apache.org/jira/browse/SPARK-44619
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44581) ShutdownHookManager get wrong hadoop user group information

2023-08-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750656#comment-17750656
 ] 

ASF GitHub Bot commented on SPARK-44581:


User 'liangyu-1' has created a pull request for this issue:
https://github.com/apache/spark/pull/42295

> ShutdownHookManager get wrong hadoop user group information
> ---
>
> Key: SPARK-44581
> URL: https://issues.apache.org/jira/browse/SPARK-44581
> Project: Spark
>  Issue Type: Bug
>  Components: Deploy, YARN
>Affects Versions: 3.2.1
>Reporter: liang yu
>Priority: Major
>
>  I use spark 3.2.1 to run a job on yarn in cluster mode. 
> when the job is finished, there is an exception that:
> {code:java}
> 2023-07-28 10:57:16,324 ERROR yarn.ApplicationMaster: Failed to cleanup 
> staging dir 
> hdfs://dmp/user/ubd_dmp_test/.sparkStaging/application_1689318995305_0290 
> org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=yarn, access=WRITE, 
> inode="/user/ubd_dmp_test/.sparkStaging":ubd_dmp_test:ubd_dmp_test:drwxr-xr-x 
> at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:506)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:349)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermissionWithContext(FSPermissionChecker.java:370)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:240)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1943)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:105)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3266)
>  at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1128)
>  at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:725)
>  at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:604)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:572)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:556)
>  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1093) at 
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1043) at 
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:971) at 
> java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
>  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2976) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
>  at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
>  at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1656) at 
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:991)
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:988)
>  at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:998)
>  at 
> org.apache.spark.deploy.yarn.ApplicationMaster.cleanupStagingDir(ApplicationMaster.scala:686)
>  at 
> org.apache.spark.deploy.yarn.ApplicationMaster.$anonfun$run$3(ApplicationMaster.scala:268)
>  at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at 
> org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at 
> org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
> scala.util.Try$.apply(Try.scala:213) at 
> 

[jira] [Commented] (SPARK-44649) Runtime Filter supports passing equivalent creation side expressions

2023-08-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750652#comment-17750652
 ] 

ASF GitHub Bot commented on SPARK-44649:


User 'beliefer' has created a pull request for this issue:
https://github.com/apache/spark/pull/42317

> Runtime Filter supports passing equivalent creation side expressions
> 
>
> Key: SPARK-44649
> URL: https://issues.apache.org/jira/browse/SPARK-44649
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: jiaan.geng
>Priority: Major
>
> {code:java}
> SELECT
>   d_year,
>   i_brand_id,
>   i_class_id,
>   i_category_id,
>   i_manufact_id,
>   cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt,
>   cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt
> FROM catalog_sales
>   JOIN item ON i_item_sk = cs_item_sk
>   JOIN date_dim ON d_date_sk = cs_sold_date_sk
>   LEFT JOIN catalog_returns ON (cs_order_number = cr_order_number
> AND cs_item_sk = cr_item_sk)
> WHERE i_category = 'Books'
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-44649) Runtime Filter supports passing equivalent creation side expressions

2023-08-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750653#comment-17750653
 ] 

ASF GitHub Bot commented on SPARK-44649:


User 'beliefer' has created a pull request for this issue:
https://github.com/apache/spark/pull/42317

> Runtime Filter supports passing equivalent creation side expressions
> 
>
> Key: SPARK-44649
> URL: https://issues.apache.org/jira/browse/SPARK-44649
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: jiaan.geng
>Priority: Major
>
> {code:java}
> SELECT
>   d_year,
>   i_brand_id,
>   i_class_id,
>   i_category_id,
>   i_manufact_id,
>   cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt,
>   cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt
> FROM catalog_sales
>   JOIN item ON i_item_sk = cs_item_sk
>   JOIN date_dim ON d_date_sk = cs_sold_date_sk
>   LEFT JOIN catalog_returns ON (cs_order_number = cr_order_number
> AND cs_item_sk = cr_item_sk)
> WHERE i_category = 'Books'
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42375) Point out the user-facing documentation in Spark Connect server startup

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750586#comment-17750586
 ] 

Junyao Huang commented on SPARK-42375:
--

Hi, [~gurwls223] ,

Do you mean we directly add the overview link 
[https://spark.apache.org/docs/latest/spark-connect-overview.html] into 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L134C1-L150C4]
 ?

> Point out the user-facing documentation in Spark Connect server startup
> ---
>
> Key: SPARK-42375
> URL: https://issues.apache.org/jira/browse/SPARK-42375
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> See SPARK-42375 in SparkSubmit.scala



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42729) Update Submitting Applications page for Spark Connect

2023-08-03 Thread Junyao Huang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750585#comment-17750585
 ] 

Junyao Huang commented on SPARK-42729:
--

Hi, [~gurwls223] ,

I think these pages are different deploy modes by using spark-submit, Local 
standalone, YARN, and k8s.

Maybe we can add a new paragraph called 'Remote' next to the table 'Master 
URLs',

And I have one more question: is spark-submit support --remote parameters?

https://spark.apache.org/docs/latest/spark-connect-overview.html

> Update Submitting Applications page for Spark Connect
> -
>
> Key: SPARK-42729
> URL: https://issues.apache.org/jira/browse/SPARK-42729
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect, Documentation
>Affects Versions: 3.5.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> https://spark.apache.org/docs/latest/submitting-applications.html
> Should we add Spark Connect application building content here or create 
> another, Spark Connect application building page.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44653) non-trivial DataFrame unions should not break caching

2023-08-03 Thread Wenchen Fan (Jira)
Wenchen Fan created SPARK-44653:
---

 Summary: non-trivial DataFrame unions should not break caching
 Key: SPARK-44653
 URL: https://issues.apache.org/jira/browse/SPARK-44653
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Wenchen Fan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org