sven-weber-db commented on code in PR #55716:
URL: https://github.com/apache/spark/pull/55716#discussion_r3257901719
##########
python/pyspark/worker.py:
##########
@@ -3652,14 +3675,22 @@ def process():
send_accumulator_updates(outfile)
# check end of stream
- if read_int(infile) == SpecialLengths.END_OF_STREAM:
+ if message_receiver.is_stream_finished():
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
+@with_faulthandler
+def main(infile, outfile):
+ # Instantiate socket message readers for executing the UDF
+ socket_reader = SparkSocketMessageReceiver(infile)
Review Comment:
Fair point. I re-ran the PySpark benchmark and added the results to the PR
description. No performance regression could be found after the changes in this
PR were introduced.
I also updated the benchmarking code to include the new length field in the
init message. Do you think this is enough data to have this change enabled by
default? Happy to discuss this further!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]