[jira] [Commented] (SPARK-44670) Fix the `test_to_excel` tests for python3.7
[ https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750959#comment-17750959 ] Madhukar commented on SPARK-44670: -- Raised a PR for using openpyxl instead of xlrd - [https://github.com/apache/spark/pull/42339] > Fix the `test_to_excel` tests for python3.7 > --- > > Key: SPARK-44670 > URL: https://issues.apache.org/jira/browse/SPARK-44670 > Project: Spark > Issue Type: Bug > Components: Pandas API on Spark >Affects Versions: 3.4.1 >Reporter: Madhukar >Priority: Minor > > With python3.7 and openpyxl installed got error: > == > ERROR: test_to_excel > (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) > Traceback (most recent call last): > File > "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", > line 102, in test_to_excel > dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location) > File > "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", > line 89, in get_excel_dfs > "got": pd.read_excel(pandas_on_spark_location, index_col=0), > File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", > line 296, in wrapper > return func(*args, **kwargs) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", > line 304, in read_excel > io = ExcelFile(io, engine=engine) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", > line 867, in __init__ > self._reader = self._engines[engine](self._io) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", > line 21, in __init__ > import_optional_dependency("xlrd", extra=err_msg) > File "/opt/conda/lib/python3.7/site-packages/pandas/compat/_optional.py", > line 110, in import_optional_dependency > raise ImportError(msg) from None > ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for > Excel support Use pip or conda to install xlrd. > -- > > > > But with xlrd 2.0.1 installed getting error > == > ERROR: test_to_excel > (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) > -- > Traceback (most recent call last): > File > "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", > line 102, in test_to_excel > dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location) > File > "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", > line 89, in get_excel_dfs > "got": pd.read_excel(pandas_on_spark_location, index_col=0), > File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", > line 296, in wrapper > return func(*args, **kwargs) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", > line 304, in read_excel > io = ExcelFile(io, engine=engine) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", > line 867, in __init__ > self._reader = self._engines[engine](self._io) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", > line 22, in __init__ > super().__init__(filepath_or_buffer) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", > line 353, in __init__ > self.book = self.load_workbook(filepath_or_buffer) > File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", > line 37, in load_workbook > return open_workbook(filepath_or_buffer) > File "/opt/conda/lib/python3.7/site-packages/xlrd/__init__.py", line 170, > in open_workbook > raise XLRDError(FILE_FORMAT_DESCRIPTIONS[file_format]+'; not supported') > xlrd.biffh.XLRDError: Excel xlsx file; not supported > -- > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7
[ https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Madhukar updated SPARK-44670: - Description: With python3.7 and openpyxl installed got error: == ERROR: test_to_excel (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) Traceback (most recent call last): File "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", line 102, in test_to_excel dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location) File "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", line 89, in get_excel_dfs "got": pd.read_excel(pandas_on_spark_location, index_col=0), File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", line 296, in wrapper return func(*args, **kwargs) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 304, in read_excel io = ExcelFile(io, engine=engine) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 867, in __init__ self._reader = self._engines[engine](self._io) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", line 21, in __init__ import_optional_dependency("xlrd", extra=err_msg) File "/opt/conda/lib/python3.7/site-packages/pandas/compat/_optional.py", line 110, in import_optional_dependency raise ImportError(msg) from None ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for Excel support Use pip or conda to install xlrd. -- But with xlrd 2.0.1 installed getting error == ERROR: test_to_excel (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) -- Traceback (most recent call last): File "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", line 102, in test_to_excel dataframes = self.get_excel_dfs(pandas_on_spark_location, pandas_location) File "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", line 89, in get_excel_dfs "got": pd.read_excel(pandas_on_spark_location, index_col=0), File "/opt/conda/lib/python3.7/site-packages/pandas/util/_decorators.py", line 296, in wrapper return func(*args, **kwargs) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 304, in read_excel io = ExcelFile(io, engine=engine) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 867, in __init__ self._reader = self._engines[engine](self._io) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", line 22, in __init__ super().__init__(filepath_or_buffer) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_base.py", line 353, in __init__ self.book = self.load_workbook(filepath_or_buffer) File "/opt/conda/lib/python3.7/site-packages/pandas/io/excel/_xlrd.py", line 37, in load_workbook return open_workbook(filepath_or_buffer) File "/opt/conda/lib/python3.7/site-packages/xlrd/__init__.py", line 170, in open_workbook raise XLRDError(FILE_FORMAT_DESCRIPTIONS[file_format]+'; not supported') xlrd.biffh.XLRDError: Excel xlsx file; not supported -- was: With python3.7 and openpyxl installed got error: == ERROR: test_to_excel (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for Excel support Use pip or conda to install xlrd. -- But with xlrd installed getting error xlrd.biffh.XLRDError: Excel xlsx file; not supported > Fix the `test_to_excel` tests for python3.7 > --- > > Key: SPARK-44670 > URL: https://issues.apache.org/jira/browse/SPARK-44670 > Project: Spark > Issue Type: Bug > Components: Pandas API on Spark >Affects Versions: 3.4.1 >Reporter: Madhukar >Priority: Minor > > With python3.7 and openpyxl installed got error: > == > ERROR: test_to_excel > (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) > Traceback (most recent call last): > File > "/workspace/apache-spark/python/pyspark/pandas/tests/test_dataframe_conversion.py", > line 102, in test_to_excel > dataframes =
[jira] [Resolved] (SPARK-44582) JVM crash caused by SMJ and WindowExec
[ https://issues.apache.org/jira/browse/SPARK-44582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-44582. -- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 42206 [https://github.com/apache/spark/pull/42206] > JVM crash caused by SMJ and WindowExec > -- > > Key: SPARK-44582 > URL: https://issues.apache.org/jira/browse/SPARK-44582 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Wan Kun >Assignee: Wan Kun >Priority: Major > Fix For: 4.0.0 > > Attachments: screenshot-1.png, screenshot-2.png > > > After inner SMJ early cleanup offheap memory, when the SMJ call the left > Window next method, the JVM may crash due to accessing unallocated memory. > !screenshot-1.png! > !screenshot-2.png! > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7fb5e0052954, pid=21756, tid=0x7fa9e8f65640 > # > # JRE version: OpenJDK Runtime Environment (Zulu 8.68.0.20-SA-linux64) > (8.0_362-b08) (build 1.8.0_362-b08) > # Java VM: OpenJDK 64-Bit Server VM (25.362-b08 mixed mode linux-amd64 ) > # Problematic frame: > # v ~StubRoutines::jlong_disjoint_arraycopy > # > # Core dump written. Default location: > /hadoop/7/yarn/local/usercache/b_carmel/appcache/application_1684894519955_24406/container_e2311_1684894519955_24406_01_005660/core > or core.21756 > # > # If you would like to submit a bug report, please visit: > # http://www.azul.com/support/ > # > --- T H R E A D --- > Current thread (0x7fb5d8f0c800): JavaThread "Executor 2802 task launch > worker for task 128116463, task 101.3 in stage 452404.0 of app > application_1684894519955_24406" daemon [_thread_in_Java, id=22042, > stack(0x7fa9e8766000,0x7fa9e8f66000)] > siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: > 0x7fa9750deac0 > Registers: > RAX=0x7fa9750deae8, RBX=0x0018, RCX=0x7fb581f0f0a8, > RDX=0x > RSP=0x7fa9e8f63a80, RBP=0x7fa9e8f63a80, RSI=0x7fb581f0f088, > RDI=0x7fa9750deae0 > R8 =0x7fb581f0f070, R9 =0x97446ed2, R10=0x7fb5e0053500, > R11=0x7fb581f0f0b0 > R12=0x7fb585ff17b0, R13=0x7fa9750deac0, R14=0x, > R15=0x7fb5d8f0c800 > RIP=0x7fb5e0052954, EFLAGS=0x00010297, CSGSFS=0x002b0033, > ERR=0x0004 > TRAPNO=0x000e > Top of Stack: (sp=0x7fa9e8f63a80) > 0x7fa9e8f63a80: 0028 7fb5e127ee97 > 0x7fa9e8f63a90: 7fa9e8f63b20 > 0x7fa9e8f63aa0: 7fa9e8f63b00 7fb60882cf70 > 0x7fa9e8f63ab0: 7faab2dda9e0 7fb581f0f070 > 0x7fa9e8f63ac0: 7fb551801188 7fb54f8c0ef8 > Stack: [0x7f8a0380,0x7f8a0400], sp=0x7f8a03ffd620, free > space=8181k > Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native > code) > v ~StubRoutines::jint_disjoint_arraycopy > J 36127 C2 > org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V > (188 bytes) @ 0x7f966187ac9f [0x7f966187a820+0x47f] > J 36146 C2 > org.apache.spark.sql.execution.window.WindowExec$$anon$1.next()Ljava/lang/Object; > (5 bytes) @ 0x7f9661a8eefc [0x7f9661a8dd60+0x119c] > J 36153 C2 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext()V > (381 bytes) @ 0x7f966180185c [0x7f9661801760+0xfc] > J 36246 C2 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z > (392 bytes) @ 0x7f96607388f0 [0x7f96607381e0+0x710] > J 36249 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V > (109 bytes) @ 0x7f965fa8ee64 [0x7f965fa8e560+0x904] > J 35645 C2 > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z (31 > bytes) @ 0x7f965fbc58e4 [0x7f965fbc58a0+0x44] > j > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lscala/collection/Iterator;Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+1 > j > org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$4398.apply()Ljava/lang/Object;+8 > j > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Lscala/Function0;Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4 > j >
[jira] [Assigned] (SPARK-44582) JVM crash caused by SMJ and WindowExec
[ https://issues.apache.org/jira/browse/SPARK-44582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-44582: Assignee: Wan Kun > JVM crash caused by SMJ and WindowExec > -- > > Key: SPARK-44582 > URL: https://issues.apache.org/jira/browse/SPARK-44582 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Wan Kun >Assignee: Wan Kun >Priority: Major > Attachments: screenshot-1.png, screenshot-2.png > > > After inner SMJ early cleanup offheap memory, when the SMJ call the left > Window next method, the JVM may crash due to accessing unallocated memory. > !screenshot-1.png! > !screenshot-2.png! > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7fb5e0052954, pid=21756, tid=0x7fa9e8f65640 > # > # JRE version: OpenJDK Runtime Environment (Zulu 8.68.0.20-SA-linux64) > (8.0_362-b08) (build 1.8.0_362-b08) > # Java VM: OpenJDK 64-Bit Server VM (25.362-b08 mixed mode linux-amd64 ) > # Problematic frame: > # v ~StubRoutines::jlong_disjoint_arraycopy > # > # Core dump written. Default location: > /hadoop/7/yarn/local/usercache/b_carmel/appcache/application_1684894519955_24406/container_e2311_1684894519955_24406_01_005660/core > or core.21756 > # > # If you would like to submit a bug report, please visit: > # http://www.azul.com/support/ > # > --- T H R E A D --- > Current thread (0x7fb5d8f0c800): JavaThread "Executor 2802 task launch > worker for task 128116463, task 101.3 in stage 452404.0 of app > application_1684894519955_24406" daemon [_thread_in_Java, id=22042, > stack(0x7fa9e8766000,0x7fa9e8f66000)] > siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: > 0x7fa9750deac0 > Registers: > RAX=0x7fa9750deae8, RBX=0x0018, RCX=0x7fb581f0f0a8, > RDX=0x > RSP=0x7fa9e8f63a80, RBP=0x7fa9e8f63a80, RSI=0x7fb581f0f088, > RDI=0x7fa9750deae0 > R8 =0x7fb581f0f070, R9 =0x97446ed2, R10=0x7fb5e0053500, > R11=0x7fb581f0f0b0 > R12=0x7fb585ff17b0, R13=0x7fa9750deac0, R14=0x, > R15=0x7fb5d8f0c800 > RIP=0x7fb5e0052954, EFLAGS=0x00010297, CSGSFS=0x002b0033, > ERR=0x0004 > TRAPNO=0x000e > Top of Stack: (sp=0x7fa9e8f63a80) > 0x7fa9e8f63a80: 0028 7fb5e127ee97 > 0x7fa9e8f63a90: 7fa9e8f63b20 > 0x7fa9e8f63aa0: 7fa9e8f63b00 7fb60882cf70 > 0x7fa9e8f63ab0: 7faab2dda9e0 7fb581f0f070 > 0x7fa9e8f63ac0: 7fb551801188 7fb54f8c0ef8 > Stack: [0x7f8a0380,0x7f8a0400], sp=0x7f8a03ffd620, free > space=8181k > Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native > code) > v ~StubRoutines::jint_disjoint_arraycopy > J 36127 C2 > org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V > (188 bytes) @ 0x7f966187ac9f [0x7f966187a820+0x47f] > J 36146 C2 > org.apache.spark.sql.execution.window.WindowExec$$anon$1.next()Ljava/lang/Object; > (5 bytes) @ 0x7f9661a8eefc [0x7f9661a8dd60+0x119c] > J 36153 C2 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext()V > (381 bytes) @ 0x7f966180185c [0x7f9661801760+0xfc] > J 36246 C2 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z > (392 bytes) @ 0x7f96607388f0 [0x7f96607381e0+0x710] > J 36249 C1 > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V > (109 bytes) @ 0x7f965fa8ee64 [0x7f965fa8e560+0x904] > J 35645 C2 > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z (31 > bytes) @ 0x7f965fbc58e4 [0x7f965fbc58a0+0x44] > j > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lscala/collection/Iterator;Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+1 > j > org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$4398.apply()Ljava/lang/Object;+8 > j > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Lscala/Function0;Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4 > j >
[jira] [Created] (SPARK-44671) Retry ExecutePlan in case initial request didn't reach server in Python client
Hyukjin Kwon created SPARK-44671: Summary: Retry ExecutePlan in case initial request didn't reach server in Python client Key: SPARK-44671 URL: https://issues.apache.org/jira/browse/SPARK-44671 Project: Spark Issue Type: Task Components: Connect, PySpark Affects Versions: 3.5.0 Reporter: Hyukjin Kwon SPARK-44624 for Python -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44009) Support profiler for Python UDTFs
[ https://issues.apache.org/jira/browse/SPARK-44009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Allison Wang updated SPARK-44009: - Summary: Support profiler for Python UDTFs (was: Support memory_profiler for UDTFs ) > Support profiler for Python UDTFs > -- > > Key: SPARK-44009 > URL: https://issues.apache.org/jira/browse/SPARK-44009 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Allison Wang >Priority: Major > > Similar to Python UDFs, we should support the memory profiler for UDTFs. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44663) Disable arrow optimization by default for Python UDTFs
[ https://issues.apache.org/jira/browse/SPARK-44663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Allison Wang updated SPARK-44663: - Summary: Disable arrow optimization by default for Python UDTFs (was: Disable arrow optimization by default) > Disable arrow optimization by default for Python UDTFs > -- > > Key: SPARK-44663 > URL: https://issues.apache.org/jira/browse/SPARK-44663 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Allison Wang >Priority: Major > > Disable arrow optimization to make Python UDTFs consistent with Python UDFs. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7
[ https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Madhukar updated SPARK-44670: - Description: With python3.7 and openpyxl installed got error: == ERROR: test_to_excel (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for Excel support Use pip or conda to install xlrd. -- But with xlrd installed getting error xlrd.biffh.XLRDError: Excel xlsx file; not supported > Fix the `test_to_excel` tests for python3.7 > --- > > Key: SPARK-44670 > URL: https://issues.apache.org/jira/browse/SPARK-44670 > Project: Spark > Issue Type: Bug > Components: Pandas API on Spark >Affects Versions: 3.4.1 >Reporter: Madhukar >Priority: Minor > > With python3.7 and openpyxl installed got error: > == > ERROR: test_to_excel > (pyspark.pandas.tests.test_dataframe_conversion.DataFrameConversionTest) > ImportError: Missing optional dependency 'xlrd'. Install xlrd >= 1.0.0 for > Excel support Use pip or conda to install xlrd. > -- > But with xlrd installed getting error > xlrd.biffh.XLRDError: Excel xlsx file; not supported > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7
[ https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Madhukar updated SPARK-44670: - Affects Version/s: 3.4.1 (was: 3.4.0) > Fix the `test_to_excel` tests for python3.7 > --- > > Key: SPARK-44670 > URL: https://issues.apache.org/jira/browse/SPARK-44670 > Project: Spark > Issue Type: Bug > Components: Pandas API on Spark >Affects Versions: 3.4.1 >Reporter: Madhukar >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44670) Fix the `test_to_excel` tests for python3.7
Madhukar created SPARK-44670: Summary: Fix the `test_to_excel` tests for python3.7 Key: SPARK-44670 URL: https://issues.apache.org/jira/browse/SPARK-44670 Project: Spark Issue Type: Bug Components: Pandas API on Spark Affects Versions: 3.4.0 Reporter: Madhukar So far, we've been skipping the `read_excel` test in pandas API on Spark: https://github.com/apache/spark/blob/6d2ce128058b439094cd1dd54253372af6977e79/python/pyspark/pandas/tests/test_dataframe_spark_io.py#L251 In https://github.com/apache/spark/pull/37671, we installed `openpyxl==3.0.10` to re-enable the `read_excel` tests, but it's still failing for some reason (Please see https://github.com/apache/spark/pull/37671#issuecomment-1237515485 for more detail). We should re-enable this test for improving the pandas-on-Spark test coverage. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44670) Fix the `test_to_excel` tests for python3.7
[ https://issues.apache.org/jira/browse/SPARK-44670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Madhukar updated SPARK-44670: - Description: (was: So far, we've been skipping the `read_excel` test in pandas API on Spark: https://github.com/apache/spark/blob/6d2ce128058b439094cd1dd54253372af6977e79/python/pyspark/pandas/tests/test_dataframe_spark_io.py#L251 In https://github.com/apache/spark/pull/37671, we installed `openpyxl==3.0.10` to re-enable the `read_excel` tests, but it's still failing for some reason (Please see https://github.com/apache/spark/pull/37671#issuecomment-1237515485 for more detail). We should re-enable this test for improving the pandas-on-Spark test coverage.) > Fix the `test_to_excel` tests for python3.7 > --- > > Key: SPARK-44670 > URL: https://issues.apache.org/jira/browse/SPARK-44670 > Project: Spark > Issue Type: Bug > Components: Pandas API on Spark >Affects Versions: 3.4.0 >Reporter: Madhukar >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-44668) ObjectMapper are threadsafe, we can reuse it in Object
[ https://issues.apache.org/jira/browse/SPARK-44668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jia Fan closed SPARK-44668. --- > ObjectMapper are threadsafe, we can reuse it in Object > -- > > Key: SPARK-44668 > URL: https://issues.apache.org/jira/browse/SPARK-44668 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.1 >Reporter: Jia Fan >Priority: Major > > ObjectMapper are threadsafe, we can reuse it in Object. But we create it in > trait, that's mean each object will create an ObjectMapper. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44668) ObjectMapper are threadsafe, we can reuse it in Object
[ https://issues.apache.org/jira/browse/SPARK-44668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jia Fan resolved SPARK-44668. - Resolution: Invalid > ObjectMapper are threadsafe, we can reuse it in Object > -- > > Key: SPARK-44668 > URL: https://issues.apache.org/jira/browse/SPARK-44668 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.4.1 >Reporter: Jia Fan >Priority: Major > > ObjectMapper are threadsafe, we can reuse it in Object. But we create it in > trait, that's mean each object will create an ObjectMapper. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44669) Parquet/ORC files written using Hive Serde should has file extension
Cheng Pan created SPARK-44669: - Summary: Parquet/ORC files written using Hive Serde should has file extension Key: SPARK-44669 URL: https://issues.apache.org/jira/browse/SPARK-44669 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.2 Reporter: Cheng Pan -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44668) ObjectMapper are threadsafe, we can reuse it in Object
Jia Fan created SPARK-44668: --- Summary: ObjectMapper are threadsafe, we can reuse it in Object Key: SPARK-44668 URL: https://issues.apache.org/jira/browse/SPARK-44668 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.4.1 Reporter: Jia Fan ObjectMapper are threadsafe, we can reuse it in Object. But we create it in trait, that's mean each object will create an ObjectMapper. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44667) Uninstall large ML libraries for non-ML jobs
Ruifeng Zheng created SPARK-44667: - Summary: Uninstall large ML libraries for non-ML jobs Key: SPARK-44667 URL: https://issues.apache.org/jira/browse/SPARK-44667 Project: Spark Issue Type: Sub-task Components: Project Infra Affects Versions: 4.0.0 Reporter: Ruifeng Zheng -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44666) Uninstall CodeQL/Go/Node in non-container jobs
Ruifeng Zheng created SPARK-44666: - Summary: Uninstall CodeQL/Go/Node in non-container jobs Key: SPARK-44666 URL: https://issues.apache.org/jira/browse/SPARK-44666 Project: Spark Issue Type: Sub-task Components: Project Infra Affects Versions: 4.0.0 Reporter: Ruifeng Zheng -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44653) non-trivial DataFrame unions should not break caching
[ https://issues.apache.org/jira/browse/SPARK-44653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-44653. - Fix Version/s: 3.3.3 3.5.0 3.4.2 Resolution: Fixed Issue resolved by pull request 42315 [https://github.com/apache/spark/pull/42315] > non-trivial DataFrame unions should not break caching > - > > Key: SPARK-44653 > URL: https://issues.apache.org/jira/browse/SPARK-44653 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.3.3, 3.5.0, 3.4.2 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44653) non-trivial DataFrame unions should not break caching
[ https://issues.apache.org/jira/browse/SPARK-44653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-44653: --- Assignee: Wenchen Fan > non-trivial DataFrame unions should not break caching > - > > Key: SPARK-44653 > URL: https://issues.apache.org/jira/browse/SPARK-44653 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44624) Spark Connect reattachable Execute when initial ExecutePlan didn't reach server
[ https://issues.apache.org/jira/browse/SPARK-44624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-44624: Assignee: Juliusz Sompolski > Spark Connect reattachable Execute when initial ExecutePlan didn't reach > server > --- > > Key: SPARK-44624 > URL: https://issues.apache.org/jira/browse/SPARK-44624 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 3.5.0, 4.0.0 >Reporter: Juliusz Sompolski >Assignee: Juliusz Sompolski >Priority: Major > > If the ExecutePlan never reached the server, a ReattachExecute will fail with > INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send > ExecutePlan again. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44624) Spark Connect reattachable Execute when initial ExecutePlan didn't reach server
[ https://issues.apache.org/jira/browse/SPARK-44624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-44624. -- Fix Version/s: 3.5.0 4.0.0 Resolution: Fixed Issue resolved by pull request 42282 [https://github.com/apache/spark/pull/42282] > Spark Connect reattachable Execute when initial ExecutePlan didn't reach > server > --- > > Key: SPARK-44624 > URL: https://issues.apache.org/jira/browse/SPARK-44624 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 3.5.0, 4.0.0 >Reporter: Juliusz Sompolski >Assignee: Juliusz Sompolski >Priority: Major > Fix For: 3.5.0, 4.0.0 > > > If the ExecutePlan never reached the server, a ReattachExecute will fail with > INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send > ExecutePlan again. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44664) Release the execute when closing the iterator in Python client
[ https://issues.apache.org/jira/browse/SPARK-44664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-44664. -- Fix Version/s: 3.5.0 4.0.0 Assignee: Hyukjin Kwon Resolution: Fixed Fixed in https://github.com/apache/spark/pull/42330 > Release the execute when closing the iterator in Python client > -- > > Key: SPARK-44664 > URL: https://issues.apache.org/jira/browse/SPARK-44664 > Project: Spark > Issue Type: Task > Components: Connect, PySpark >Affects Versions: 3.5.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Major > Fix For: 3.5.0, 4.0.0 > > > See SPARK-44656 and SPARK-44642. We need the same in Python client -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44619) Free up disk space for container jobs
[ https://issues.apache.org/jira/browse/SPARK-44619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng resolved SPARK-44619. --- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 42253 [https://github.com/apache/spark/pull/42253] > Free up disk space for container jobs > - > > Key: SPARK-44619 > URL: https://issues.apache.org/jira/browse/SPARK-44619 > Project: Spark > Issue Type: Sub-task > Components: Project Infra >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Assignee: Ruifeng Zheng >Priority: Major > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43562) Enable DataFrameTests.test_append for pandas 2.0.0.
[ https://issues.apache.org/jira/browse/SPARK-43562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-43562: Assignee: Haejoon Lee > Enable DataFrameTests.test_append for pandas 2.0.0. > --- > > Key: SPARK-43562 > URL: https://issues.apache.org/jira/browse/SPARK-43562 > Project: Spark > Issue Type: Sub-task > Components: Pandas API on Spark >Affects Versions: 4.0.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > > Enable DataFrameTests.test_append for pandas 2.0.0. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43870) Enable SeriesTests for pandas 2.0.0.
[ https://issues.apache.org/jira/browse/SPARK-43870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-43870. -- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 42268 [https://github.com/apache/spark/pull/42268] > Enable SeriesTests for pandas 2.0.0. > > > Key: SPARK-43870 > URL: https://issues.apache.org/jira/browse/SPARK-43870 > Project: Spark > Issue Type: Sub-task > Components: Pandas API on Spark, PySpark >Affects Versions: 4.0.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > Fix For: 4.0.0 > > > test list: > * test_value_counts > * test_append > * test_astype > * test_between > * test_mad > * test_quantile > * test_rank > * test_between_time > * test_iteritems > * test_product > * test_factorize -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-43870) Enable SeriesTests for pandas 2.0.0.
[ https://issues.apache.org/jira/browse/SPARK-43870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-43870: Assignee: Haejoon Lee > Enable SeriesTests for pandas 2.0.0. > > > Key: SPARK-43870 > URL: https://issues.apache.org/jira/browse/SPARK-43870 > Project: Spark > Issue Type: Sub-task > Components: Pandas API on Spark, PySpark >Affects Versions: 4.0.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > > test list: > * test_value_counts > * test_append > * test_astype > * test_between > * test_mad > * test_quantile > * test_rank > * test_between_time > * test_iteritems > * test_product > * test_factorize -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43562) Enable DataFrameTests.test_append for pandas 2.0.0.
[ https://issues.apache.org/jira/browse/SPARK-43562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-43562. -- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 42268 [https://github.com/apache/spark/pull/42268] > Enable DataFrameTests.test_append for pandas 2.0.0. > --- > > Key: SPARK-43562 > URL: https://issues.apache.org/jira/browse/SPARK-43562 > Project: Spark > Issue Type: Sub-task > Components: Pandas API on Spark >Affects Versions: 4.0.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > Fix For: 4.0.0 > > > Enable DataFrameTests.test_append for pandas 2.0.0. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-43873) Enable DataFrameSlowTests for pandas 2.0.0.
[ https://issues.apache.org/jira/browse/SPARK-43873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-43873. -- Fix Version/s: 4.0.0 Assignee: Haejoon Lee Resolution: Fixed Fixed in https://github.com/apache/spark/pull/42319 > Enable DataFrameSlowTests for pandas 2.0.0. > --- > > Key: SPARK-43873 > URL: https://issues.apache.org/jira/browse/SPARK-43873 > Project: Spark > Issue Type: Sub-task > Components: Pandas API on Spark, PySpark >Affects Versions: 4.0.0 >Reporter: Haejoon Lee >Assignee: Haejoon Lee >Priority: Major > Fix For: 4.0.0 > > > test list: > * test_describe > * test_between_time > * test_product > * test_iteritems > * test_mad > * test_cov > * test_quantile -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44640) Improve error messages for Python UDTF returning non iterable
[ https://issues.apache.org/jira/browse/SPARK-44640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-44640. -- Fix Version/s: 4.0.0 Assignee: Allison Wang Resolution: Fixed Fixed in https://github.com/apache/spark/pull/42302 > Improve error messages for Python UDTF returning non iterable > - > > Key: SPARK-44640 > URL: https://issues.apache.org/jira/browse/SPARK-44640 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 4.0.0 > > > When the return type of a UDTF is not an iterable, the error message can be > confusing to users. For example for this UDTF: > {code:java} > @udtf(returnType="x: int") > class TestUDTF: > def eval(self, a): > return a {code} > Currently it fails with this error for regular UDTFs: > return tuple(map(verify_and_convert_result, res)) > TypeError: 'int' object is not iterable > And this error for arrow-optimized UDTFs: > raise ValueError("DataFrame constructor not properly called!") > ValueError: DataFrame constructor not properly called! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44548) Add support for pandas-on-Spark DataFrame assertDataFrameEqual
[ https://issues.apache.org/jira/browse/SPARK-44548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amanda Liu updated SPARK-44548: --- Summary: Add support for pandas-on-Spark DataFrame assertDataFrameEqual (was: Add support for pandas DataFrame assertDataFrameEqual) > Add support for pandas-on-Spark DataFrame assertDataFrameEqual > -- > > Key: SPARK-44548 > URL: https://issues.apache.org/jira/browse/SPARK-44548 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Amanda Liu >Assignee: Amanda Liu >Priority: Major > Fix For: 3.5.0, 4.0.0 > > > SPIP: > https://docs.google.com/document/d/1OkyBn3JbEHkkQgSQ45Lq82esXjr9rm2Vj7Ih_4zycRc/edit#heading=h.f5f0u2riv07v -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44665) Add support for pandas DataFrame assertDataFrameEqual
Amanda Liu created SPARK-44665: -- Summary: Add support for pandas DataFrame assertDataFrameEqual Key: SPARK-44665 URL: https://issues.apache.org/jira/browse/SPARK-44665 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Amanda Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44664) Release the execute when closing the iterator in Python client
Hyukjin Kwon created SPARK-44664: Summary: Release the execute when closing the iterator in Python client Key: SPARK-44664 URL: https://issues.apache.org/jira/browse/SPARK-44664 Project: Spark Issue Type: Task Components: Connect, PySpark Affects Versions: 3.5.0 Reporter: Hyukjin Kwon See SPARK-44656 and SPARK-44642. We need the same in Python client -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44642) ExecutePlanResponseReattachableIterator should release all after error
[ https://issues.apache.org/jira/browse/SPARK-44642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-44642: Assignee: Juliusz Sompolski > ExecutePlanResponseReattachableIterator should release all after error > -- > > Key: SPARK-44642 > URL: https://issues.apache.org/jira/browse/SPARK-44642 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 3.5.0, 4.0.0 >Reporter: Juliusz Sompolski >Assignee: Juliusz Sompolski >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44642) ExecutePlanResponseReattachableIterator should release all after error
[ https://issues.apache.org/jira/browse/SPARK-44642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-44642. -- Fix Version/s: 3.5.0 4.0.0 Resolution: Fixed Issue resolved by pull request 42304 [https://github.com/apache/spark/pull/42304] > ExecutePlanResponseReattachableIterator should release all after error > -- > > Key: SPARK-44642 > URL: https://issues.apache.org/jira/browse/SPARK-44642 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 3.5.0, 4.0.0 >Reporter: Juliusz Sompolski >Assignee: Juliusz Sompolski >Priority: Major > Fix For: 3.5.0, 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44652) Raise error when only one df is None
[ https://issues.apache.org/jira/browse/SPARK-44652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-44652. -- Fix Version/s: 3.5.0 4.0.0 Resolution: Fixed Issue resolved by pull request 42314 [https://github.com/apache/spark/pull/42314] > Raise error when only one df is None > > > Key: SPARK-44652 > URL: https://issues.apache.org/jira/browse/SPARK-44652 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Amanda Liu >Assignee: Amanda Liu >Priority: Major > Fix For: 3.5.0, 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44652) Raise error when only one df is None
[ https://issues.apache.org/jira/browse/SPARK-44652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-44652: Assignee: Amanda Liu > Raise error when only one df is None > > > Key: SPARK-44652 > URL: https://issues.apache.org/jira/browse/SPARK-44652 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 3.5.0 >Reporter: Amanda Liu >Assignee: Amanda Liu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit, even with Default
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7. How long
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. {anchor:singleRowFilter}*Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. {anchor:srf}*Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#srf] approach would still result in perf benefit. h2.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. {anchor:singleRowfilter}*Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowfilter} 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. Q7. How
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, approach would still result in perf benefit. Q7. How long will it take?
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful. Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result in perf benefit. h2. *Q7. How long will it take?* The code is
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowfilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor: singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Created] (SPARK-44663) Disable arrow optimization by default
Allison Wang created SPARK-44663: Summary: Disable arrow optimization by default Key: SPARK-44663 URL: https://issues.apache.org/jira/browse/SPARK-44663 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 3.5.0 Reporter: Allison Wang Disable arrow optimization to make Python UDTFs consistent with Python UDFs. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44661) getMapOutputLocation should not throw NPE
[ https://issues.apache.org/jira/browse/SPARK-44661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-44661: -- Priority: Minor (was: Major) > getMapOutputLocation should not throw NPE > - > > Key: SPARK-44661 > URL: https://issues.apache.org/jira/browse/SPARK-44661 > Project: Spark > Issue Type: Test > Components: Spark Core, Tests >Affects Versions: 3.4.1, 3.5.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Minor > Fix For: 3.4.2, 3.5.0, 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {:singleRowFilter} 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7. How
[jira] [Resolved] (SPARK-44661) getMapOutputLocation should not throw NPE
[ https://issues.apache.org/jira/browse/SPARK-44661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-44661. --- Fix Version/s: 3.5.0 4.0.0 3.4.2 Resolution: Fixed Issue resolved by pull request 42326 [https://github.com/apache/spark/pull/42326] > getMapOutputLocation should not throw NPE > - > > Key: SPARK-44661 > URL: https://issues.apache.org/jira/browse/SPARK-44661 > Project: Spark > Issue Type: Test > Components: Spark Core, Tests >Affects Versions: 3.4.1, 3.5.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.5.0, 4.0.0, 3.4.2 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7. How
[jira] [Assigned] (SPARK-44661) getMapOutputLocation should not throw NPE
[ https://issues.apache.org/jira/browse/SPARK-44661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-44661: - Assignee: Dongjoon Hyun > getMapOutputLocation should not throw NPE > - > > Key: SPARK-44661 > URL: https://issues.apache.org/jira/browse/SPARK-44661 > Project: Spark > Issue Type: Test > Components: Spark Core, Tests >Affects Versions: 3.4.1, 3.5.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44646) Migrate Log4j 2.x in Spark 3.4.1 to Logback
[ https://issues.apache.org/jira/browse/SPARK-44646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750895#comment-17750895 ] L. C. Hsieh commented on SPARK-44646: - I have not used it but maybe you can try https://logging.apache.org/log4j/log4j-2.2/log4j-to-slf4j/index.html. > Migrate Log4j 2.x in Spark 3.4.1 to Logback > --- > > Key: SPARK-44646 > URL: https://issues.apache.org/jira/browse/SPARK-44646 > Project: Spark > Issue Type: Brainstorming > Components: Build >Affects Versions: 3.4.1 >Reporter: Yu Tian >Priority: Major > > Hi, > We are working on the spark 3.4.1 upgrade from spark 3.1.3, in our logging > system, we are using logback framework, it is working with spark 3.1.3 since > it is using log4j 1.x. However, when we upgrade spark to 3.4.1, based on the > [release > notes|https://spark.apache.org/docs/latest/core-migration-guide.html], spark > is migrating from log4j 2.x from log4j 1.x, the way we are replacing the > log4j with logback is causing build failures in spark master start process. > Error: Unable to initialize main class org.apache.spark.deploy.master.Master > Caused by: java.lang.NoClassDefFoundError: > org/apache/logging/log4j/core/Filter > In our current approach, we are using log4j-over-slf4j to replace the > log4j-core, it is only applicable to log4j 1.x library. And there is no > log4j-over-slf4j for log4j 2.x out there yet. (please correct me if I am > wrong). > I am also curious that why spark choose to use log4j 2.x instead of using > SPI, which gives the users less flexibility to choose whatever logger > implementation they want to use. > I want to share this issue and see if anyone else has been reported this and > if there is any work-around or alternative solutions for it. Any suggestions > are appreciated, thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
Asif created SPARK-44662: Summary: SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns Key: SPARK-44662 URL: https://issues.apache.org/jira/browse/SPARK-44662 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.3 Reporter: Asif Fix For: 3.3.3 h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit
[jira] [Commented] (SPARK-44646) Migrate Log4j 2.x in Spark 3.4.1 to Logback
[ https://issues.apache.org/jira/browse/SPARK-44646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750891#comment-17750891 ] Yu Tian commented on SPARK-44646: - Hi [~viirya] Could you please check this thread? It is a question related to https://issues.apache.org/jira/browse/SPARK-37814 Thanks. > Migrate Log4j 2.x in Spark 3.4.1 to Logback > --- > > Key: SPARK-44646 > URL: https://issues.apache.org/jira/browse/SPARK-44646 > Project: Spark > Issue Type: Brainstorming > Components: Build >Affects Versions: 3.4.1 >Reporter: Yu Tian >Priority: Major > > Hi, > We are working on the spark 3.4.1 upgrade from spark 3.1.3, in our logging > system, we are using logback framework, it is working with spark 3.1.3 since > it is using log4j 1.x. However, when we upgrade spark to 3.4.1, based on the > [release > notes|https://spark.apache.org/docs/latest/core-migration-guide.html], spark > is migrating from log4j 2.x from log4j 1.x, the way we are replacing the > log4j with logback is causing build failures in spark master start process. > Error: Unable to initialize main class org.apache.spark.deploy.master.Master > Caused by: java.lang.NoClassDefFoundError: > org/apache/logging/log4j/core/Filter > In our current approach, we are using log4j-over-slf4j to replace the > log4j-core, it is only applicable to log4j 1.x library. And there is no > log4j-over-slf4j for log4j 2.x out there yet. (please correct me if I am > wrong). > I am also curious that why spark choose to use log4j 2.x instead of using > SPI, which gives the users less flexibility to choose whatever logger > implementation they want to use. > I want to share this issue and see if anyone else has been reported this and > if there is any work-around or alternative solutions for it. Any suggestions > are appreciated, thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44658) ShuffleStatus.getMapStatus should return None instead of Some(null)
[ https://issues.apache.org/jira/browse/SPARK-44658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-44658: - Assignee: Dongjoon Hyun > ShuffleStatus.getMapStatus should return None instead of Some(null) > --- > > Key: SPARK-44658 > URL: https://issues.apache.org/jira/browse/SPARK-44658 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Critical > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44658) ShuffleStatus.getMapStatus should return None instead of Some(null)
[ https://issues.apache.org/jira/browse/SPARK-44658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-44658. --- Fix Version/s: 3.5.0 4.0.0 Resolution: Fixed Issue resolved by pull request 42323 [https://github.com/apache/spark/pull/42323] > ShuffleStatus.getMapStatus should return None instead of Some(null) > --- > > Key: SPARK-44658 > URL: https://issues.apache.org/jira/browse/SPARK-44658 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Critical > Fix For: 3.5.0, 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44660) Relax constraint for columnar shuffle check in AQE
[ https://issues.apache.org/jira/browse/SPARK-44660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750881#comment-17750881 ] Chao Sun commented on SPARK-44660: -- In fact the check is necessary, but it seems {code} postStageCreationRules(outputsColumnar = plan.supportsColumnar) {code} can be relaxed: if the new shuffle operator supports columnar, then maybe we shouldn't insert {{ColumnarToRow}} to this stage. This is assuming the following stage knows the shuffle output is columnar and has corresponding {{ColumnarToRow}} if necessary. > Relax constraint for columnar shuffle check in AQE > -- > > Key: SPARK-44660 > URL: https://issues.apache.org/jira/browse/SPARK-44660 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.1 >Reporter: Chao Sun >Priority: Major > > Currently in AQE, after evaluating the columnar rules, Spark will check if > the top operator of the stage is still a shuffle operator, and throw > exception if it doesn't. > {code} > val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, > isFinalStage = false))) > val newPlan = applyPhysicalRules( > optimized, > postStageCreationRules(outputsColumnar = plan.supportsColumnar), > Some((planChangeLogger, "AQE Post Stage Creation"))) > if (e.isInstanceOf[ShuffleExchangeLike]) { > if (!newPlan.isInstanceOf[ShuffleExchangeLike]) { > throw SparkException.internalError( > "Custom columnar rules cannot transform shuffle node to > something else.") > } > {code} > However, once a shuffle operator is transformed into a custom columnar > shuffle operator, the {{supportsColumnar}} of the new shuffle operator will > return true, and therefore the columnar rules will insert {{ColumnarToRow}} > on top of it. This means the {{newPlan}} is likely no longer a > {{ShuffleExchangeLike}} but a {{ColumnarToRow}}, and exception will be > thrown, even though the use case is valid. > This JIRA proposes to relax the check by allowing the above case. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet
[ https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-44641: -- Affects Version/s: 3.4.0 > SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but > conditions unmet > -- > > Key: SPARK-44641 > URL: https://issues.apache.org/jira/browse/SPARK-44641 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.0, 3.4.1 >Reporter: Szehon Ho >Priority: Blocker > > Adding the following test case in KeyGroupedPartitionSuite demonstrates the > problem. > > {code:java} > test("test join key is the second partition key and a transform") { > val items_partitions = Array(bucket(8, "id"), days("arrive_time")) > createTable(items, items_schema, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + > s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + > s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") > val purchases_partitions = Array(bucket(8, "item_id"), days("time")) > createTable(purchases, purchases_schema, purchases_partitions) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > s"(1, 42.0, cast('2020-01-01' as timestamp)), " + > s"(1, 44.0, cast('2020-01-15' as timestamp)), " + > s"(1, 45.0, cast('2020-01-15' as timestamp)), " + > s"(2, 11.0, cast('2020-01-01' as timestamp)), " + > s"(3, 19.5, cast('2020-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "true") { > val df = sql("SELECT id, name, i.price as purchase_price, " + > "p.item_id, p.price as sale_price " + > s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + > "ON i.arrive_time = p.time " + > "ORDER BY id, purchase_price, p.item_id, sale_price") > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys > are partition keys") > checkAnswer(df, > Seq( > Row(1, "aa", 40.0, 1, 42.0), > Row(1, "aa", 40.0, 2, 11.0), > Row(1, "aa", 41.0, 1, 44.0), > Row(1, "aa", 41.0, 1, 45.0), > Row(2, "bb", 10.0, 1, 42.0), > Row(2, "bb", 10.0, 2, 11.0), > Row(2, "bb", 10.5, 1, 42.0), > Row(2, "bb", 10.5, 2, 11.0), > Row(3, "cc", 15.5, 3, 19.5) > ) > ) > } > }{code} > > Note: this tests has setup the datasourceV2 to return multiple splits for > same partition. > In this case, SPJ is not triggered (because join key does not match partition > key), but the following code in DSV2Scan: > [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194] > intended to fill the empty partition for 'pushdown-vallue' will still iterate > through non-grouped partition and lookup from grouped partition to fill the > map, resulting in some duplicate input data fed into the join. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet
[ https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chao Sun updated SPARK-44641: - Priority: Blocker (was: Major) > SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but > conditions unmet > -- > > Key: SPARK-44641 > URL: https://issues.apache.org/jira/browse/SPARK-44641 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.1 >Reporter: Szehon Ho >Priority: Blocker > > Adding the following test case in KeyGroupedPartitionSuite demonstrates the > problem. > > {code:java} > test("test join key is the second partition key and a transform") { > val items_partitions = Array(bucket(8, "id"), days("arrive_time")) > createTable(items, items_schema, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + > s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + > s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") > val purchases_partitions = Array(bucket(8, "item_id"), days("time")) > createTable(purchases, purchases_schema, purchases_partitions) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > s"(1, 42.0, cast('2020-01-01' as timestamp)), " + > s"(1, 44.0, cast('2020-01-15' as timestamp)), " + > s"(1, 45.0, cast('2020-01-15' as timestamp)), " + > s"(2, 11.0, cast('2020-01-01' as timestamp)), " + > s"(3, 19.5, cast('2020-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "true") { > val df = sql("SELECT id, name, i.price as purchase_price, " + > "p.item_id, p.price as sale_price " + > s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + > "ON i.arrive_time = p.time " + > "ORDER BY id, purchase_price, p.item_id, sale_price") > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys > are partition keys") > checkAnswer(df, > Seq( > Row(1, "aa", 40.0, 1, 42.0), > Row(1, "aa", 40.0, 2, 11.0), > Row(1, "aa", 41.0, 1, 44.0), > Row(1, "aa", 41.0, 1, 45.0), > Row(2, "bb", 10.0, 1, 42.0), > Row(2, "bb", 10.0, 2, 11.0), > Row(2, "bb", 10.5, 1, 42.0), > Row(2, "bb", 10.5, 2, 11.0), > Row(3, "cc", 15.5, 3, 19.5) > ) > ) > } > }{code} > > Note: this tests has setup the datasourceV2 to return multiple splits for > same partition. > In this case, SPJ is not triggered (because join key does not match partition > key), but the following code in DSV2Scan: > [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194] > intended to fill the empty partition for 'pushdown-vallue' will still iterate > through non-grouped partition and lookup from grouped partition to fill the > map, resulting in some duplicate input data fed into the join. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44661) getMapOutputLocation should not throw NPE
Dongjoon Hyun created SPARK-44661: - Summary: getMapOutputLocation should not throw NPE Key: SPARK-44661 URL: https://issues.apache.org/jira/browse/SPARK-44661 Project: Spark Issue Type: Test Components: Spark Core, Tests Affects Versions: 3.4.1, 3.5.0 Reporter: Dongjoon Hyun -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44660) Relax constraint for columnar shuffle check in AQE
Chao Sun created SPARK-44660: Summary: Relax constraint for columnar shuffle check in AQE Key: SPARK-44660 URL: https://issues.apache.org/jira/browse/SPARK-44660 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.1 Reporter: Chao Sun Currently in AQE, after evaluating the columnar rules, Spark will check if the top operator of the stage is still a shuffle operator, and throw exception if it doesn't. {code} val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false))) val newPlan = applyPhysicalRules( optimized, postStageCreationRules(outputsColumnar = plan.supportsColumnar), Some((planChangeLogger, "AQE Post Stage Creation"))) if (e.isInstanceOf[ShuffleExchangeLike]) { if (!newPlan.isInstanceOf[ShuffleExchangeLike]) { throw SparkException.internalError( "Custom columnar rules cannot transform shuffle node to something else.") } {code} However, once a shuffle operator is transformed into a custom columnar shuffle operator, the {{supportsColumnar}} of the new shuffle operator will return true, and therefore the columnar rules will insert {{ColumnarToRow}} on top of it. This means the {{newPlan}} is likely no longer a {{ShuffleExchangeLike}} but a {{ColumnarToRow}}, and exception will be thrown, even though the use case is valid. This JIRA proposes to relax the check by allowing the above case. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44658) ShuffleStatus.getMapStatus should return None instead of Some(null)
[ https://issues.apache.org/jira/browse/SPARK-44658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-44658: -- Summary: ShuffleStatus.getMapStatus should return None instead of Some(null) (was: ShuffleStatus.getMapStatus should return None) > ShuffleStatus.getMapStatus should return None instead of Some(null) > --- > > Key: SPARK-44658 > URL: https://issues.apache.org/jira/browse/SPARK-44658 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.5.0 >Reporter: Dongjoon Hyun >Priority: Critical > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44659) SPJ: Include keyGroupedPartitioning in StoragePartitionJoinParams equality check
[ https://issues.apache.org/jira/browse/SPARK-44659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chao Sun updated SPARK-44659: - Summary: SPJ: Include keyGroupedPartitioning in StoragePartitionJoinParams equality check (was: Include keyGroupedPartitioning in StoragePartitionJoinParams equality check) > SPJ: Include keyGroupedPartitioning in StoragePartitionJoinParams equality > check > > > Key: SPARK-44659 > URL: https://issues.apache.org/jira/browse/SPARK-44659 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: Chao Sun >Priority: Minor > > Currently {{StoragePartitionJoinParams}} doesn't include > {{keyGroupedPartitioning}} in its {{equals}} and {{hashCode}} computation. > For completeness, we should include it as well since it is a member of the > class. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44641) SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet
[ https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chao Sun updated SPARK-44641: - Summary: SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet (was: Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet) > SPJ: Results duplicated when SPJ partial-cluster and pushdown enabled but > conditions unmet > -- > > Key: SPARK-44641 > URL: https://issues.apache.org/jira/browse/SPARK-44641 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.1 >Reporter: Szehon Ho >Priority: Major > > Adding the following test case in KeyGroupedPartitionSuite demonstrates the > problem. > > {code:java} > test("test join key is the second partition key and a transform") { > val items_partitions = Array(bucket(8, "id"), days("arrive_time")) > createTable(items, items_schema, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + > s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + > s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") > val purchases_partitions = Array(bucket(8, "item_id"), days("time")) > createTable(purchases, purchases_schema, purchases_partitions) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > s"(1, 42.0, cast('2020-01-01' as timestamp)), " + > s"(1, 44.0, cast('2020-01-15' as timestamp)), " + > s"(1, 45.0, cast('2020-01-15' as timestamp)), " + > s"(2, 11.0, cast('2020-01-01' as timestamp)), " + > s"(3, 19.5, cast('2020-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "true") { > val df = sql("SELECT id, name, i.price as purchase_price, " + > "p.item_id, p.price as sale_price " + > s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + > "ON i.arrive_time = p.time " + > "ORDER BY id, purchase_price, p.item_id, sale_price") > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys > are partition keys") > checkAnswer(df, > Seq( > Row(1, "aa", 40.0, 1, 42.0), > Row(1, "aa", 40.0, 2, 11.0), > Row(1, "aa", 41.0, 1, 44.0), > Row(1, "aa", 41.0, 1, 45.0), > Row(2, "bb", 10.0, 1, 42.0), > Row(2, "bb", 10.0, 2, 11.0), > Row(2, "bb", 10.5, 1, 42.0), > Row(2, "bb", 10.5, 2, 11.0), > Row(3, "cc", 15.5, 3, 19.5) > ) > ) > } > }{code} > > Note: this tests has setup the datasourceV2 to return multiple splits for > same partition. > In this case, SPJ is not triggered (because join key does not match partition > key), but the following code in DSV2Scan: > [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194] > intended to fill the empty partition for 'pushdown-vallue' will still iterate > through non-grouped partition and lookup from grouped partition to fill the > map, resulting in some duplicate input data fed into the join. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44659) Include keyGroupedPartitioning in StoragePartitionJoinParams equality check
Chao Sun created SPARK-44659: Summary: Include keyGroupedPartitioning in StoragePartitionJoinParams equality check Key: SPARK-44659 URL: https://issues.apache.org/jira/browse/SPARK-44659 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.5.0 Reporter: Chao Sun Currently {{StoragePartitionJoinParams}} doesn't include {{keyGroupedPartitioning}} in its {{equals}} and {{hashCode}} computation. For completeness, we should include it as well since it is a member of the class. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44641) Results duplicated when SPJ partial-cluster and pushdown enabled but conditions unmet
[ https://issues.apache.org/jira/browse/SPARK-44641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chao Sun updated SPARK-44641: - Parent: SPARK-37375 Issue Type: Sub-task (was: Bug) > Results duplicated when SPJ partial-cluster and pushdown enabled but > conditions unmet > - > > Key: SPARK-44641 > URL: https://issues.apache.org/jira/browse/SPARK-44641 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.4.1 >Reporter: Szehon Ho >Priority: Major > > Adding the following test case in KeyGroupedPartitionSuite demonstrates the > problem. > > {code:java} > test("test join key is the second partition key and a transform") { > val items_partitions = Array(bucket(8, "id"), days("arrive_time")) > createTable(items, items_schema, items_partitions) > sql(s"INSERT INTO testcat.ns.$items VALUES " + > s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " + > s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " + > s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " + > s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " + > s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))") > val purchases_partitions = Array(bucket(8, "item_id"), days("time")) > createTable(purchases, purchases_schema, purchases_partitions) > sql(s"INSERT INTO testcat.ns.$purchases VALUES " + > s"(1, 42.0, cast('2020-01-01' as timestamp)), " + > s"(1, 44.0, cast('2020-01-15' as timestamp)), " + > s"(1, 45.0, cast('2020-01-15' as timestamp)), " + > s"(2, 11.0, cast('2020-01-01' as timestamp)), " + > s"(3, 19.5, cast('2020-02-01' as timestamp))") > withSQLConf( > SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", > SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true", > SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> > "true") { > val df = sql("SELECT id, name, i.price as purchase_price, " + > "p.item_id, p.price as sale_price " + > s"FROM testcat.ns.$items i JOIN testcat.ns.$purchases p " + > "ON i.arrive_time = p.time " + > "ORDER BY id, purchase_price, p.item_id, sale_price") > val shuffles = collectShuffles(df.queryExecution.executedPlan) > assert(!shuffles.isEmpty, "should not perform SPJ as not all join keys > are partition keys") > checkAnswer(df, > Seq( > Row(1, "aa", 40.0, 1, 42.0), > Row(1, "aa", 40.0, 2, 11.0), > Row(1, "aa", 41.0, 1, 44.0), > Row(1, "aa", 41.0, 1, 45.0), > Row(2, "bb", 10.0, 1, 42.0), > Row(2, "bb", 10.0, 2, 11.0), > Row(2, "bb", 10.5, 1, 42.0), > Row(2, "bb", 10.5, 2, 11.0), > Row(3, "cc", 15.5, 3, 19.5) > ) > ) > } > }{code} > > Note: this tests has setup the datasourceV2 to return multiple splits for > same partition. > In this case, SPJ is not triggered (because join key does not match partition > key), but the following code in DSV2Scan: > [https://github.com/apache/spark/blob/v3.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala#L194] > intended to fill the empty partition for 'pushdown-vallue' will still iterate > through non-grouped partition and lookup from grouped partition to fill the > map, resulting in some duplicate input data fed into the join. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44658) ShuffleStatus.getMapStatus should return None
Dongjoon Hyun created SPARK-44658: - Summary: ShuffleStatus.getMapStatus should return None Key: SPARK-44658 URL: https://issues.apache.org/jira/browse/SPARK-44658 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.5.0 Reporter: Dongjoon Hyun -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43496) Have a separate config for Memory limits for kubernetes pods
[ https://issues.apache.org/jira/browse/SPARK-43496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750791#comment-17750791 ] Laurenceau Julien commented on SPARK-43496: --- Hi, I would like to suggest to go beyond that : As is spark on kubernetes always sets the following attributes to executors and driver pods: {code:java} requests: cpu: "1" memory: "1234M" limits: memory: "1234M" {code} with the values coming from the _[driver|executor]-memory_ and _[driver|executor]-cores_ options. This is good, except for resource utilization ! On most dev environment it's would be ok no to set any resource requests and limits. One could just rely on usual JVM options. Besides it would be nice if these settings could be user defined at different values than JVM values so that we would be able to manage overcommitment through it. Example: * options : _[driver|executor]-memory_ and _[driver|executor]-cores_ are used to set JVM options. If no custom definition it also sets pods requests & limits to stay consistent * new options: _spark.kubernetes.driver.requests.cpu_ _spark.kubernetes.driver.requests.memory_ _spark.kubernetes.driver.limits.cpu_ _spark.kubernetes.driver.limits.memory_ _spark.kubernetes.executor.requests.cpu_ _spark.kubernetes.executor.requests.memory_ _spark.kubernetes.executor.limits.cpu_ _spark.kubernetes.executor.limits.memory_ if unset then stay consistent with current behavior if set to 0 then disable this definition This would also solve the issue that driver/executor core is defined as an Integer and cannot be 0.5 for a driver. > Have a separate config for Memory limits for kubernetes pods > > > Key: SPARK-43496 > URL: https://issues.apache.org/jira/browse/SPARK-43496 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.4.0 >Reporter: Alexander Yerenkow >Priority: Major > > Whole allocated memory to JVM is set into pod resources as both request and > limits. > This means there's not a way to use more memory for burst-like jobs in a > shared environment. > For example, if spark job uses external process (outside of JVM) to access > data, a bit of extra memory required for that, and having configured higher > limits for mem could be of use. > Another thought here - have a way to configure different JVM/ pod memory > request also could be a valid use case. > > Github PR: [https://github.com/apache/spark/pull/41067] > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44654) In subquery cannot perform partition pruning
[ https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750743#comment-17750743 ] 7mming7 commented on SPARK-44654: - [~yumwang] This is also possible, but if it is the case of multiple records? > In subquery cannot perform partition pruning > > > Key: SPARK-44654 > URL: https://issues.apache.org/jira/browse/SPARK-44654 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: 7mming7 >Priority: Minor > Labels: performance > Attachments: image-2023-08-03-17-22-53-981.png > > > The following SQL cannot perform partition pruning > {code:java} > SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from > parquet_part){code} > As can be seen from the execution plan below, the partition pruning of left > cannot be performed after the subquery of in is converted into join > !image-2023-08-03-17-22-53-981.png! > The current issue proposes to optimize insubquery. Only when the value of in > is greater than a threshold, insubquery will be converted into Join -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44654) In subquery cannot perform partition pruning
[ https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750739#comment-17750739 ] Yuming Wang commented on SPARK-44654: - Another way is convert join to filter if maximum number of rows on one side is 1: https://github.com/apache/spark/pull/42114 > In subquery cannot perform partition pruning > > > Key: SPARK-44654 > URL: https://issues.apache.org/jira/browse/SPARK-44654 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: 7mming7 >Priority: Minor > Labels: performance > Attachments: image-2023-08-03-17-22-53-981.png > > > The following SQL cannot perform partition pruning > {code:java} > SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from > parquet_part){code} > As can be seen from the execution plan below, the partition pruning of left > cannot be performed after the subquery of in is converted into join > !image-2023-08-03-17-22-53-981.png! > The current issue proposes to optimize insubquery. Only when the value of in > is greater than a threshold, insubquery will be converted into Join -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44657) Incorrect limit handling and config parsing in Arrow collect
Venkata Sai Akhil Gudesa created SPARK-44657: Summary: Incorrect limit handling and config parsing in Arrow collect Key: SPARK-44657 URL: https://issues.apache.org/jira/browse/SPARK-44657 Project: Spark Issue Type: Bug Components: Connect Affects Versions: 3.4.1, 3.4.0, 3.4.2, 3.5.0 Reporter: Venkata Sai Akhil Gudesa In the arrow writer [code|https://github.com/apache/spark/blob/6161bf44f40f8146ea4c115c788fd4eaeb128769/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala#L154-L163] , the conditions don’t seem to hold what the documentation says regd "{_}maxBatchSize and maxRecordsPerBatch, respect whatever smaller"{_} since it seems to actually respect the conf which is "larger" (i.e less restrictive) due to _||_ operator. Further, when the `{_}CONNECT_GRPC_ARROW_MAX_BATCH_SIZE{_}` conf is read, the value is not converted to bytes from Mib ([example|https://github.com/apache/spark/blob/3e5203c64c06cc8a8560dfa0fb6f52e74589b583/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L103]). -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44656) Close dangling iterators in SparkResult too (Spark Connect Scala)
[ https://issues.apache.org/jira/browse/SPARK-44656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juliusz Sompolski updated SPARK-44656: -- Epic Link: SPARK-43754 > Close dangling iterators in SparkResult too (Spark Connect Scala) > - > > Key: SPARK-44656 > URL: https://issues.apache.org/jira/browse/SPARK-44656 > Project: Spark > Issue Type: Improvement > Components: Connect >Affects Versions: 3.5.0 >Reporter: Alice Sayutina >Priority: Major > > SPARK-44636 followup. We didn't address iterators grabbed in SparkResult > there. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44656) Close dangling iterators in SparkResult too (Spark Connect Scala)
Alice Sayutina created SPARK-44656: -- Summary: Close dangling iterators in SparkResult too (Spark Connect Scala) Key: SPARK-44656 URL: https://issues.apache.org/jira/browse/SPARK-44656 Project: Spark Issue Type: Improvement Components: Connect Affects Versions: 3.5.0 Reporter: Alice Sayutina SPARK-44636 followup. We didn't address iterators grabbed in SparkResult there. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44619) Free up disk space for container jobs
[ https://issues.apache.org/jira/browse/SPARK-44619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruifeng Zheng updated SPARK-44619: -- Summary: Free up disk space for container jobs (was: Free up disk space for pyspark container jobs) > Free up disk space for container jobs > - > > Key: SPARK-44619 > URL: https://issues.apache.org/jira/browse/SPARK-44619 > Project: Spark > Issue Type: Sub-task > Components: Project Infra >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44655) make the code cleaner about static and dynamc data/partition filters
Wenchen Fan created SPARK-44655: --- Summary: make the code cleaner about static and dynamc data/partition filters Key: SPARK-44655 URL: https://issues.apache.org/jira/browse/SPARK-44655 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44654) In subquery cannot perform partition pruning
[ https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 7mming7 updated SPARK-44654: Description: The following SQL cannot perform partition pruning {code:java} SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from parquet_part){code} As can be seen from the execution plan below, the partition pruning of left cannot be performed after the subquery of in is converted into join !image-2023-08-03-17-22-53-981.png! The current issue proposes to optimize insubquery. Only when the value of in is greater than a threshold, insubquery will be converted into Join was: The following SQL cannot perform partition pruning {code:java} SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from parquet_part){code} As can be seen from the execution plan below, the partition pruning of left cannot be performed after the subquery of in is converted into join !image-2023-08-03-17-17-34-858.png! The current issue proposes to optimize insubquery. Only when the value of in is greater than a threshold, insubquery will be converted into Join > In subquery cannot perform partition pruning > > > Key: SPARK-44654 > URL: https://issues.apache.org/jira/browse/SPARK-44654 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: 7mming7 >Priority: Minor > Labels: performance > Attachments: image-2023-08-03-17-22-53-981.png > > > The following SQL cannot perform partition pruning > {code:java} > SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from > parquet_part){code} > As can be seen from the execution plan below, the partition pruning of left > cannot be performed after the subquery of in is converted into join > !image-2023-08-03-17-22-53-981.png! > The current issue proposes to optimize insubquery. Only when the value of in > is greater than a threshold, insubquery will be converted into Join -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40927) Memory issue with Structured streaming
[ https://issues.apache.org/jira/browse/SPARK-40927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750660#comment-17750660 ] Iain Morrison commented on SPARK-40927: --- In our case I found the following settings greatly improved our streaming applications, currently running for over 2 weeks without OOM killed (previously lasted a day or two) 1. Use RocksDB state store provider improved executor memory usage "spark.sql.streaming.stateStore.providerClass" -> "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider" Not sure if there is a leak in the default HDFS state store implementation or not. 2. Store UI on disk instead of in memory in the driver "spark.ui.store.path" -> "some path" Old issue but I hope this helps someone > Memory issue with Structured streaming > -- > > Key: SPARK-40927 > URL: https://issues.apache.org/jira/browse/SPARK-40927 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.3.0, 3.2.2 >Reporter: Mihir Kelkar >Priority: Major > > In Pyspark Structured streaming with Kafka as source and sink, the driver as > well as the executors seem to get OOM killed after a long period of time (few > days). Not able to pinpoint to any specific thing. > But 8-12 hrs long runs also show the slow memory creep in Prometheus metrics > values - > # JVM Off-heap memory of both driver and executors keep on increasing over > time (12-24hrs observation time) [I have NOT enabled off-heap usage] > # JVM heap memory of executors also keeps on bumping up in slow steps. > # JVM RSS of executors and driver keeps increasing but python RSS does not > increase > -Basic operation of counting rows from within sdf.forEachBatch() is being > done to debug ( -Original business logic has Some dropDuplicates, > aggregations , windowing are being done within the forEachBatch. > -watermarking on a custom timestamp column is being done. > > Heap Dump analysis shows large no. of duplicate strings (which look like > generated code). Further large no. of byte[], char[] and UTF8String objects.. > Does this point to any potential memory leak in Tungsten optimizer related > code? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44654) In subquery cannot perform partition pruning
[ https://issues.apache.org/jira/browse/SPARK-44654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 7mming7 updated SPARK-44654: Attachment: image-2023-08-03-17-22-53-981.png > In subquery cannot perform partition pruning > > > Key: SPARK-44654 > URL: https://issues.apache.org/jira/browse/SPARK-44654 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: 7mming7 >Priority: Minor > Labels: performance > Attachments: image-2023-08-03-17-22-53-981.png > > > The following SQL cannot perform partition pruning > {code:java} > SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from > parquet_part){code} > As can be seen from the execution plan below, the partition pruning of left > cannot be performed after the subquery of in is converted into join > !image-2023-08-03-17-17-34-858.png! > The current issue proposes to optimize insubquery. Only when the value of in > is greater than a threshold, insubquery will be converted into Join -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44654) In subquery cannot perform partition pruning
7mming7 created SPARK-44654: --- Summary: In subquery cannot perform partition pruning Key: SPARK-44654 URL: https://issues.apache.org/jira/browse/SPARK-44654 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: 7mming7 The following SQL cannot perform partition pruning {code:java} SELECT * FROM parquet_part WHERE id_type in (SELECT max(id_type) from parquet_part){code} As can be seen from the execution plan below, the partition pruning of left cannot be performed after the subquery of in is converted into join !image-2023-08-03-17-17-34-858.png! The current issue proposes to optimize insubquery. Only when the value of in is greater than a threshold, insubquery will be converted into Join -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44575) Implement Error Translation
[ https://issues.apache.org/jira/browse/SPARK-44575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750659#comment-17750659 ] ASF GitHub Bot commented on SPARK-44575: User 'heyihong' has created a pull request for this issue: https://github.com/apache/spark/pull/42266 > Implement Error Translation > --- > > Key: SPARK-44575 > URL: https://issues.apache.org/jira/browse/SPARK-44575 > Project: Spark > Issue Type: New Feature > Components: Connect >Affects Versions: 3.5.0 >Reporter: Yihong He >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44619) Free up disk space for pyspark container jobs
[ https://issues.apache.org/jira/browse/SPARK-44619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750657#comment-17750657 ] ASF GitHub Bot commented on SPARK-44619: User 'zhengruifeng' has created a pull request for this issue: https://github.com/apache/spark/pull/42253 > Free up disk space for pyspark container jobs > - > > Key: SPARK-44619 > URL: https://issues.apache.org/jira/browse/SPARK-44619 > Project: Spark > Issue Type: Sub-task > Components: Project Infra >Affects Versions: 4.0.0 >Reporter: Ruifeng Zheng >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44581) ShutdownHookManager get wrong hadoop user group information
[ https://issues.apache.org/jira/browse/SPARK-44581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750656#comment-17750656 ] ASF GitHub Bot commented on SPARK-44581: User 'liangyu-1' has created a pull request for this issue: https://github.com/apache/spark/pull/42295 > ShutdownHookManager get wrong hadoop user group information > --- > > Key: SPARK-44581 > URL: https://issues.apache.org/jira/browse/SPARK-44581 > Project: Spark > Issue Type: Bug > Components: Deploy, YARN >Affects Versions: 3.2.1 >Reporter: liang yu >Priority: Major > > I use spark 3.2.1 to run a job on yarn in cluster mode. > when the job is finished, there is an exception that: > {code:java} > 2023-07-28 10:57:16,324 ERROR yarn.ApplicationMaster: Failed to cleanup > staging dir > hdfs://dmp/user/ubd_dmp_test/.sparkStaging/application_1689318995305_0290 > org.apache.hadoop.security.AccessControlException: Permission denied: > user=yarn, access=WRITE, > inode="/user/ubd_dmp_test/.sparkStaging":ubd_dmp_test:ubd_dmp_test:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:506) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:349) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermissionWithContext(FSPermissionChecker.java:370) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:240) > at > org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1943) > at > org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:105) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3266) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1128) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:725) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:604) > at > org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:572) > at > org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:556) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1093) at > org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1043) at > org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:971) at > java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2976) at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88) > at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:1656) at > org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:991) > at > org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:988) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:998) > at > org.apache.spark.deploy.yarn.ApplicationMaster.cleanupStagingDir(ApplicationMaster.scala:686) > at > org.apache.spark.deploy.yarn.ApplicationMaster.$anonfun$run$3(ApplicationMaster.scala:268) > at > org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at > org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at > org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at > scala.util.Try$.apply(Try.scala:213) at >
[jira] [Commented] (SPARK-44649) Runtime Filter supports passing equivalent creation side expressions
[ https://issues.apache.org/jira/browse/SPARK-44649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750652#comment-17750652 ] ASF GitHub Bot commented on SPARK-44649: User 'beliefer' has created a pull request for this issue: https://github.com/apache/spark/pull/42317 > Runtime Filter supports passing equivalent creation side expressions > > > Key: SPARK-44649 > URL: https://issues.apache.org/jira/browse/SPARK-44649 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 4.0.0 >Reporter: jiaan.geng >Priority: Major > > {code:java} > SELECT > d_year, > i_brand_id, > i_class_id, > i_category_id, > i_manufact_id, > cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt, > cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt > FROM catalog_sales > JOIN item ON i_item_sk = cs_item_sk > JOIN date_dim ON d_date_sk = cs_sold_date_sk > LEFT JOIN catalog_returns ON (cs_order_number = cr_order_number > AND cs_item_sk = cr_item_sk) > WHERE i_category = 'Books' > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44649) Runtime Filter supports passing equivalent creation side expressions
[ https://issues.apache.org/jira/browse/SPARK-44649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750653#comment-17750653 ] ASF GitHub Bot commented on SPARK-44649: User 'beliefer' has created a pull request for this issue: https://github.com/apache/spark/pull/42317 > Runtime Filter supports passing equivalent creation side expressions > > > Key: SPARK-44649 > URL: https://issues.apache.org/jira/browse/SPARK-44649 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 4.0.0 >Reporter: jiaan.geng >Priority: Major > > {code:java} > SELECT > d_year, > i_brand_id, > i_class_id, > i_category_id, > i_manufact_id, > cs_quantity - COALESCE(cr_return_quantity, 0) AS sales_cnt, > cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS sales_amt > FROM catalog_sales > JOIN item ON i_item_sk = cs_item_sk > JOIN date_dim ON d_date_sk = cs_sold_date_sk > LEFT JOIN catalog_returns ON (cs_order_number = cr_order_number > AND cs_item_sk = cr_item_sk) > WHERE i_category = 'Books' > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-42375) Point out the user-facing documentation in Spark Connect server startup
[ https://issues.apache.org/jira/browse/SPARK-42375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750586#comment-17750586 ] Junyao Huang commented on SPARK-42375: -- Hi, [~gurwls223] , Do you mean we directly add the overview link [https://spark.apache.org/docs/latest/spark-connect-overview.html] into [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L134C1-L150C4] ? > Point out the user-facing documentation in Spark Connect server startup > --- > > Key: SPARK-42375 > URL: https://issues.apache.org/jira/browse/SPARK-42375 > Project: Spark > Issue Type: Sub-task > Components: Connect >Affects Versions: 3.4.0 >Reporter: Hyukjin Kwon >Priority: Major > > See SPARK-42375 in SparkSubmit.scala -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-42729) Update Submitting Applications page for Spark Connect
[ https://issues.apache.org/jira/browse/SPARK-42729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750585#comment-17750585 ] Junyao Huang commented on SPARK-42729: -- Hi, [~gurwls223] , I think these pages are different deploy modes by using spark-submit, Local standalone, YARN, and k8s. Maybe we can add a new paragraph called 'Remote' next to the table 'Master URLs', And I have one more question: is spark-submit support --remote parameters? https://spark.apache.org/docs/latest/spark-connect-overview.html > Update Submitting Applications page for Spark Connect > - > > Key: SPARK-42729 > URL: https://issues.apache.org/jira/browse/SPARK-42729 > Project: Spark > Issue Type: Sub-task > Components: Connect, Documentation >Affects Versions: 3.5.0 >Reporter: Hyukjin Kwon >Priority: Major > > https://spark.apache.org/docs/latest/submitting-applications.html > Should we add Spark Connect application building content here or create > another, Spark Connect application building page. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44653) non-trivial DataFrame unions should not break caching
Wenchen Fan created SPARK-44653: --- Summary: non-trivial DataFrame unions should not break caching Key: SPARK-44653 URL: https://issues.apache.org/jira/browse/SPARK-44653 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.0 Reporter: Wenchen Fan -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org