sven-weber-db commented on code in PR #55716:
URL: https://github.com/apache/spark/pull/55716#discussion_r3281566988
##########
python/benchmarks/bench_eval_type.py:
##########
@@ -127,6 +127,47 @@ def write_preamble(cls, buf: io.BytesIO) -> None:
cls.write_bool(False, buf) # needs_broadcast_decryption_server
write_int(0, buf) # num_broadcast_variables
+ @classmethod
+ def write_init_message(
+ cls,
+ eval_type: int,
+ write_udf: Callable[[io.BufferedIOBase], None],
+ target_buffer: io.BytesIO,
+ runner_conf: dict[str, str] | None = None,
+ eval_conf: dict[str, str] | None = None,
+ ) -> None:
+ """Write the initial message with header, length + its data."""
+
+ # Write everything to a seperate buffer so we can
+ # determine the length of the initial message.
+ buf = io.BytesIO()
+ cls.write_preamble(buf)
+ write_int(eval_type, buf)
+ if runner_conf:
+ write_int(len(runner_conf), buf)
+ for k, v in runner_conf.items():
+ cls.write_utf8(k, buf)
+ cls.write_utf8(v, buf)
+ else:
+ write_int(0, buf) # RunnerConf (0 key-value pairs)
+ if eval_conf:
+ write_int(len(eval_conf), buf)
+ for k, v in eval_conf.items():
+ cls.write_utf8(k, buf)
+ cls.write_utf8(v, buf)
+ else:
+ write_int(0, buf) # EvalConf (0 key-value pairs)
+ write_udf(buf)
+
+ # Write the actual data
+ # header...
+ write_int(-8, target_buffer) # SpecialLengths.START_OF_INIT_MESSAGE
Review Comment:
Yes, I think we can actually import `SpecialLenghts` from
`pyspark.serializers`. I did not do that to keep the code consistent with the
remainder of the `bench_eval_type.py`. However, you are right, this is
currently a gap - let me fix the existing references.
##########
python/benchmarks/bench_eval_type.py:
##########
@@ -127,6 +127,47 @@ def write_preamble(cls, buf: io.BytesIO) -> None:
cls.write_bool(False, buf) # needs_broadcast_decryption_server
write_int(0, buf) # num_broadcast_variables
+ @classmethod
+ def write_init_message(
+ cls,
+ eval_type: int,
+ write_udf: Callable[[io.BufferedIOBase], None],
+ target_buffer: io.BytesIO,
+ runner_conf: dict[str, str] | None = None,
+ eval_conf: dict[str, str] | None = None,
+ ) -> None:
+ """Write the initial message with header, length + its data."""
+
+ # Write everything to a seperate buffer so we can
+ # determine the length of the initial message.
+ buf = io.BytesIO()
+ cls.write_preamble(buf)
+ write_int(eval_type, buf)
+ if runner_conf:
+ write_int(len(runner_conf), buf)
+ for k, v in runner_conf.items():
+ cls.write_utf8(k, buf)
+ cls.write_utf8(v, buf)
+ else:
+ write_int(0, buf) # RunnerConf (0 key-value pairs)
+ if eval_conf:
+ write_int(len(eval_conf), buf)
+ for k, v in eval_conf.items():
+ cls.write_utf8(k, buf)
+ cls.write_utf8(v, buf)
+ else:
+ write_int(0, buf) # EvalConf (0 key-value pairs)
+ write_udf(buf)
+
+ # Write the actual data
+ # header...
+ write_int(-8, target_buffer) # SpecialLengths.START_OF_INIT_MESSAGE
Review Comment:
Yes, I think we can actually import `SpecialLenghts` from
`pyspark.serializers`. I did not do that to keep the code consistent with the
remainder of the `bench_eval_type.py`. However, you are right, this is
currently a gap - let me fix the existing references and the newly introduced
one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]