Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-21 Thread via GitHub


zhengruifeng commented on PR #55222:
URL: https://github.com/apache/spark/pull/55222#issuecomment-4293311047

   merged to master as the intermediate step in the refactoring


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-21 Thread via GitHub


zhengruifeng closed pull request #55222: [SPARK-55608][PYTHON] Refactor 
SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF
URL: https://github.com/apache/spark/pull/55222


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-16 Thread via GitHub


Yicong-Huang commented on PR #55222:
URL: https://github.com/apache/spark/pull/55222#issuecomment-4265243068

   @zhengruifeng any comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-16 Thread via GitHub


gaogaotiantian commented on PR #55222:
URL: https://github.com/apache/spark/pull/55222#issuecomment-4263644550

   I'm okay with this as a temporary state. We have to structure it in the 
future somehow. The overall process is longer that I expected. If it's 70 
lines, it's too long. We need to reduce duplicated code. We still need to put 
the shared code together, not "list everything we need to do for a single type 
of UDF".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-14 Thread via GitHub


Yicong-Huang commented on code in PR #55222:
URL: https://github.com/apache/spark/pull/55222#discussion_r3077722317


##
python/pyspark/worker.py:
##
@@ -2833,6 +2797,144 @@ def grouped_func(
 # profiling is not supported for UDF
 return grouped_func, None, ser, ser
 
+if eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:
+import pyarrow as pa
+
+assert num_udfs == 1, "One GROUPED_MAP_ARROW UDF expected here."
+grouped_udf, arg_offsets, return_type, num_udf_args = udfs[0]
+parsed_offsets = extract_key_value_indexes(arg_offsets)
+
+arrow_return_type = to_arrow_type(
+return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
+)
+if runner_conf.assign_cols_by_name:
+expected_cols_and_types = {
+col.name: to_arrow_type(col.dataType, timezone="UTC") for col 
in return_type.fields
+}
+else:
+expected_cols_and_types = [
+(col.name, to_arrow_type(col.dataType, timezone="UTC"))
+for col in return_type.fields
+]
+

Review Comment:
   assertion added! they should be 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-14 Thread via GitHub


Yicong-Huang commented on code in PR #55222:
URL: https://github.com/apache/spark/pull/55222#discussion_r3077661890


##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
 for i in range(num_udfs)
 ]
 
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets 
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for 
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets:  list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating 
groups where n is the
+number of DataFrames.  Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes
+group[group[1] +3 group[0]]: value attributes
+"""
+parsed = []
+idx = 0
+while idx < len(grouped_arg_offsets):
+offsets_len = grouped_arg_offsets[idx]
+idx += 1
+offsets = grouped_arg_offsets[idx : idx + offsets_len]
+split_index = offsets[0] + 1
+offset_keys = offsets[1:split_index]
+offset_values = offsets[split_index:]

Review Comment:
   we can refactor it in the future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-09 Thread via GitHub


Yicong-Huang commented on code in PR #55222:
URL: https://github.com/apache/spark/pull/55222#discussion_r3061592491


##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
 for i in range(num_udfs)
 ]
 
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets 
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for 
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets:  list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating 
groups where n is the
+number of DataFrames.  Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes

Review Comment:
   Yeah that's due to order of the methods, had to move this definition up. I 
prefer keep it as it is and I can refactor this method in a future PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-09 Thread via GitHub


gaogaotiantian commented on code in PR #55222:
URL: https://github.com/apache/spark/pull/55222#discussion_r3061546329


##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
 for i in range(num_udfs)
 ]
 
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets 
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for 
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets:  list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating 
groups where n is the
+number of DataFrames.  Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes

Review Comment:
   We can make this real python right? `group[2: group[1] + 2]`? Also let's 
talk about return type too.



##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
 for i in range(num_udfs)
 ]
 
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets 
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for 
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets:  list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating 
groups where n is the
+number of DataFrames.  Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes
+group[group[1] +3 group[0]]: value attributes
+"""
+parsed = []
+idx = 0
+while idx < len(grouped_arg_offsets):
+offsets_len = grouped_arg_offsets[idx]
+idx += 1
+offsets = grouped_arg_offsets[idx : idx + offsets_len]
+split_index = offsets[0] + 1
+offset_keys = offsets[1:split_index]
+offset_values = offsets[split_index:]

Review Comment:
   I think having temporary variables like `split_index` does not make the code 
easier to read. If we already have the comment above, we could probably just do 
the same as comments right? Make them consistent.



##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
 for i in range(num_udfs)
 ]
 
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets 
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for 
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets:  list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating 
groups where n is the
+number of DataFrames.  Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes

Review Comment:
   Just realize that this is directly copied. You can either keep it as it is 
or make some changes while moving it. I'm fine either way.



##
python/pyspark/worker.py:
##
@@ -2833,6 +2797,144 @@ def grouped_func(
 # profiling is not supported for UDF
 return grouped_func, None, ser, ser
 
+if eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:
+import pyarrow as pa
+
+assert num_udfs == 1, "One GROUPED_MAP_ARROW UDF expected here."
+grouped_udf, arg_offsets, return_type, num_udf_args = udfs[0]
+parsed_offsets = extract_key_value_indexes(arg_offsets)
+
+arrow_return_type = to_arrow_type(
+return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
+)
+if runner_conf.assign_cols_by_name:
+expected_cols_and_types = {
+col.name: to_arrow_type(col.dataType, timezone="UTC") for col 
in return_type.fields
+}
+else:
+expected_cols_and_types = [
+(col.name, to_arrow_type(col.dataType, timezone="UTC"))
+for col in return_type.fields
+]
+

Review Comment:
   Let's also add an assertion about the length of `parsed_offsets` (`0` 
probably

Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-09 Thread via GitHub


Yicong-Huang commented on PR #55222:
URL: https://github.com/apache/spark/pull/55222#issuecomment-4218724750

   cc @gaogaotiantian @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-07 Thread via GitHub


Yicong-Huang commented on PR #55222:
URL: https://github.com/apache/spark/pull/55222#issuecomment-4200683082

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



[PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]

2026-04-06 Thread via GitHub


Yicong-Huang opened a new pull request, #55222:
URL: https://github.com/apache/spark/pull/55222

   ### What changes were proposed in this pull request?
   
   Refactor `SQL_GROUPED_MAP_ARROW_UDF` and `SQL_GROUPED_MAP_ARROW_ITER_UDF` to 
be self-contained in `read_udfs()`, following the same pattern established by 
SPARK-56123 (grouped agg arrow) and SPARK-56189 (window agg arrow).
   
   Key changes:
   - **Remove wrapper functions** `wrap_grouped_map_arrow_udf` and 
`wrap_grouped_map_arrow_iter_udf` from the module-level scope
   - **Remove `ArrowStreamGroupUDFSerializer` usage** - these eval types now 
use `ArrowStreamGroupSerializer` (pure I/O) like other grouped arrow eval types
   - **Move all processing logic into `read_udfs()`** - flatten struct, extract 
key/value columns, call UDF, verify output, reorder columns, and wrap struct 
are all handled in the self-contained `grouped_func`
   - **Move `extract_key_value_indexes`** earlier in `read_udfs()` so it is 
available to both arrow and pandas grouped eval types
   - **Pass `(func, args_offsets, return_type, num_udf_args)` from 
`read_single_udf`** instead of a pre-wrapped function, consistent with other 
refactored eval types
   
   ### Why are the changes needed?
   
   Part of SPARK-55388 (Refactor PythonEvalType processing logic). Making each 
eval type self-contained in `read_udfs()` improves readability and makes it 
easier to reason about the data flow for each eval type independently.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests. No behavior change.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]