Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
zhengruifeng commented on PR #55222: URL: https://github.com/apache/spark/pull/55222#issuecomment-4293311047 merged to master as the intermediate step in the refactoring -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
zhengruifeng closed pull request #55222: [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF URL: https://github.com/apache/spark/pull/55222 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang commented on PR #55222: URL: https://github.com/apache/spark/pull/55222#issuecomment-4265243068 @zhengruifeng any comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
gaogaotiantian commented on PR #55222: URL: https://github.com/apache/spark/pull/55222#issuecomment-4263644550 I'm okay with this as a temporary state. We have to structure it in the future somehow. The overall process is longer that I expected. If it's 70 lines, it's too long. We need to reduce duplicated code. We still need to put the shared code together, not "list everything we need to do for a single type of UDF". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang commented on code in PR #55222:
URL: https://github.com/apache/spark/pull/55222#discussion_r3077722317
##
python/pyspark/worker.py:
##
@@ -2833,6 +2797,144 @@ def grouped_func(
# profiling is not supported for UDF
return grouped_func, None, ser, ser
+if eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:
+import pyarrow as pa
+
+assert num_udfs == 1, "One GROUPED_MAP_ARROW UDF expected here."
+grouped_udf, arg_offsets, return_type, num_udf_args = udfs[0]
+parsed_offsets = extract_key_value_indexes(arg_offsets)
+
+arrow_return_type = to_arrow_type(
+return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
+)
+if runner_conf.assign_cols_by_name:
+expected_cols_and_types = {
+col.name: to_arrow_type(col.dataType, timezone="UTC") for col
in return_type.fields
+}
+else:
+expected_cols_and_types = [
+(col.name, to_arrow_type(col.dataType, timezone="UTC"))
+for col in return_type.fields
+]
+
Review Comment:
assertion added! they should be 1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang commented on code in PR #55222: URL: https://github.com/apache/spark/pull/55222#discussion_r3077661890 ## python/pyspark/worker.py: ## @@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, eval_conf): for i in range(num_udfs) ] +def extract_key_value_indexes(grouped_arg_offsets): +""" +Helper function to extract the key and value indexes from arg_offsets for the grouped and +cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for equivalent scala code. + +Parameters +-- +grouped_arg_offsets: list +List containing the key and value indexes of columns of the +DataFrames to be passed to the udf. It consists of n repeating groups where n is the +number of DataFrames. Each group has the following format: +group[0]: length of group +group[1]: length of key indexes +group[2.. group[1] +2]: key attributes +group[group[1] +3 group[0]]: value attributes +""" +parsed = [] +idx = 0 +while idx < len(grouped_arg_offsets): +offsets_len = grouped_arg_offsets[idx] +idx += 1 +offsets = grouped_arg_offsets[idx : idx + offsets_len] +split_index = offsets[0] + 1 +offset_keys = offsets[1:split_index] +offset_values = offsets[split_index:] Review Comment: we can refactor it in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang commented on code in PR #55222: URL: https://github.com/apache/spark/pull/55222#discussion_r3061592491 ## python/pyspark/worker.py: ## @@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, eval_conf): for i in range(num_udfs) ] +def extract_key_value_indexes(grouped_arg_offsets): +""" +Helper function to extract the key and value indexes from arg_offsets for the grouped and +cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for equivalent scala code. + +Parameters +-- +grouped_arg_offsets: list +List containing the key and value indexes of columns of the +DataFrames to be passed to the udf. It consists of n repeating groups where n is the +number of DataFrames. Each group has the following format: +group[0]: length of group +group[1]: length of key indexes +group[2.. group[1] +2]: key attributes Review Comment: Yeah that's due to order of the methods, had to move this definition up. I prefer keep it as it is and I can refactor this method in a future PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
gaogaotiantian commented on code in PR #55222:
URL: https://github.com/apache/spark/pull/55222#discussion_r3061546329
##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf,
eval_conf):
for i in range(num_udfs)
]
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets: list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating
groups where n is the
+number of DataFrames. Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes
Review Comment:
We can make this real python right? `group[2: group[1] + 2]`? Also let's
talk about return type too.
##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf,
eval_conf):
for i in range(num_udfs)
]
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets: list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating
groups where n is the
+number of DataFrames. Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes
+group[group[1] +3 group[0]]: value attributes
+"""
+parsed = []
+idx = 0
+while idx < len(grouped_arg_offsets):
+offsets_len = grouped_arg_offsets[idx]
+idx += 1
+offsets = grouped_arg_offsets[idx : idx + offsets_len]
+split_index = offsets[0] + 1
+offset_keys = offsets[1:split_index]
+offset_values = offsets[split_index:]
Review Comment:
I think having temporary variables like `split_index` does not make the code
easier to read. If we already have the comment above, we could probably just do
the same as comments right? Make them consistent.
##
python/pyspark/worker.py:
##
@@ -2575,6 +2510,35 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf,
eval_conf):
for i in range(num_udfs)
]
+def extract_key_value_indexes(grouped_arg_offsets):
+"""
+Helper function to extract the key and value indexes from arg_offsets
for the grouped and
+cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for
equivalent scala code.
+
+Parameters
+--
+grouped_arg_offsets: list
+List containing the key and value indexes of columns of the
+DataFrames to be passed to the udf. It consists of n repeating
groups where n is the
+number of DataFrames. Each group has the following format:
+group[0]: length of group
+group[1]: length of key indexes
+group[2.. group[1] +2]: key attributes
Review Comment:
Just realize that this is directly copied. You can either keep it as it is
or make some changes while moving it. I'm fine either way.
##
python/pyspark/worker.py:
##
@@ -2833,6 +2797,144 @@ def grouped_func(
# profiling is not supported for UDF
return grouped_func, None, ser, ser
+if eval_type == PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF:
+import pyarrow as pa
+
+assert num_udfs == 1, "One GROUPED_MAP_ARROW UDF expected here."
+grouped_udf, arg_offsets, return_type, num_udf_args = udfs[0]
+parsed_offsets = extract_key_value_indexes(arg_offsets)
+
+arrow_return_type = to_arrow_type(
+return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
+)
+if runner_conf.assign_cols_by_name:
+expected_cols_and_types = {
+col.name: to_arrow_type(col.dataType, timezone="UTC") for col
in return_type.fields
+}
+else:
+expected_cols_and_types = [
+(col.name, to_arrow_type(col.dataType, timezone="UTC"))
+for col in return_type.fields
+]
+
Review Comment:
Let's also add an assertion about the length of `parsed_offsets` (`0`
probably
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang commented on PR #55222: URL: https://github.com/apache/spark/pull/55222#issuecomment-4218724750 cc @gaogaotiantian @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang commented on PR #55222: URL: https://github.com/apache/spark/pull/55222#issuecomment-4200683082 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
[PR] [SPARK-55608][PYTHON] Refactor SQL_GROUPED_MAP_ARROW_UDF and SQL_GROUPED_MAP_ARROW_ITER_UDF [spark]
Yicong-Huang opened a new pull request, #55222: URL: https://github.com/apache/spark/pull/55222 ### What changes were proposed in this pull request? Refactor `SQL_GROUPED_MAP_ARROW_UDF` and `SQL_GROUPED_MAP_ARROW_ITER_UDF` to be self-contained in `read_udfs()`, following the same pattern established by SPARK-56123 (grouped agg arrow) and SPARK-56189 (window agg arrow). Key changes: - **Remove wrapper functions** `wrap_grouped_map_arrow_udf` and `wrap_grouped_map_arrow_iter_udf` from the module-level scope - **Remove `ArrowStreamGroupUDFSerializer` usage** - these eval types now use `ArrowStreamGroupSerializer` (pure I/O) like other grouped arrow eval types - **Move all processing logic into `read_udfs()`** - flatten struct, extract key/value columns, call UDF, verify output, reorder columns, and wrap struct are all handled in the self-contained `grouped_func` - **Move `extract_key_value_indexes`** earlier in `read_udfs()` so it is available to both arrow and pandas grouped eval types - **Pass `(func, args_offsets, return_type, num_udf_args)` from `read_single_udf`** instead of a pre-wrapped function, consistent with other refactored eval types ### Why are the changes needed? Part of SPARK-55388 (Refactor PythonEvalType processing logic). Making each eval type self-contained in `read_udfs()` improves readability and makes it easier to reason about the data flow for each eval type independently. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. No behavior change. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
