Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209217311 Okay, so what the author needs to do is to create a new JIRA ticket for the regression test PR and change the title of that PR to point to the correct JIRA ticket, then this PR is good to go? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209271003 To @anblanco , of course, we appreciated one PR per branches (branch-4.1/4.0/3.5). You are doing correctly for them. Your PRs are blocked only for sorting merging order. The Apache Spark community will merge your PRs in the order `4.1` and `4.0` and `3.5`. Currently, `branch-3.5` and `branch-4.0` PRs got approvals, but we should not merge them before this PR. So, it's blocked. There is no technical issue there. It's simply backporting convention. So, don't worry about that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on code in PR #55201: URL: https://github.com/apache/spark/pull/55201#discussion_r3054034302 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -100,4 +100,7 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() -main(sock_file, sock_file) +try: +main(sock_file, sock_file) +finally: +sock_file.close() Review Comment: The `with` statement requires implementation for the `context`. Basically you need to implement something to define what should be done for start/end phase - that's what's done in master. In this case, we want to keep the fix clean and clear. Having the `sock_file` closed after main is what we really need. Python provided the native context for `socket.makefile`, but we got our `sock_file` from our own implementation `local_connect_and_auth(conn_info, auth_secret)`, so we need to close it explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209550286 Merged to branch-4.1 for Apache Spark 4.1.2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun closed pull request #55201: [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path URL: https://github.com/apache/spark/pull/55201 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209191708 Yes, this should be main PR and the master branch PR should not have the same JIRA ID as I suggested already here. - https://github.com/apache/spark/pull/55201#issuecomment-4209149095 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on code in PR #55201: URL: https://github.com/apache/spark/pull/55201#discussion_r3054075498 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -100,4 +100,7 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() -main(sock_file, sock_file) +try: +main(sock_file, sock_file) +finally: +sock_file.close() Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
anblanco commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209236466 @dongjoon-hyun - I want to make sure I understand correctly Today, SPARK-53759 does not have any backported changes into 4.1/4.0/3.5. In `master` it was fixed by a much larger refactor, whereas these PRs are intended to backport a smaller version of that fix into older branches. I can go ahead and rename the title !55223, but when you look at the changes, each of these backports is targeted to its earliest ancestor - they changes don't cherry-pick cleanly across backported branches, hence the three separate PRs. Can we consider holding !55223 until I have a new JIRA ticket, while allowing these backports to go through? I'm trying to catch the 3.5 LTS window if possible, as I think this is relevant for Windows developers' local development environments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on code in PR #55201: URL: https://github.com/apache/spark/pull/55201#discussion_r3053917670 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -100,4 +100,7 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] sock.settimeout(None) write_int(os.getpid(), sock_file) sock_file.flush() -main(sock_file, sock_file) +try: +main(sock_file, sock_file) +finally: +sock_file.close() Review Comment: Just a question. Why don't we use `with` statement here instead of `try ... finally`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209176490 > To @gaogaotiantian , we should guide not to reuse the same JIRA ID to the contributor. It's not only misleading but also incorrect in the community. > @anblanco is mistakenly using the same JIRA ID for those PRs. Yeah I'm not suggesting that we should use the same JIRA ID for these PRs. I agree that another JIRA ticket should be created for the coverage tests. I'm replying to your comment about blocking this PR because the main PR has not landed. This PR should be the main PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209198074 For the record, this PR should not be merged in the current status in order to avoid merging mistakenly. It will screw the commit logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209161680 > @dongjoon-hyun #55223 is not the main PR. > > So this is a fix targeting 3.5, 4.0, 4.1 because the master branch has already been fixed by another refactoring. Backporting the full refactoring is a bit too much so we decided to just do a simple fix for previous branches. > > This is the main PR for the fix, along with another 2 PRs for 4.0 and 3.5. > > #55223 is a separate PR to add regression tests to master branch. We want to keep the code clean for the backport. To @gaogaotiantian , we should guide not to reuse the same JIRA ID to the contributor. It's not only misleading but also incorrect in the community. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
dongjoon-hyun commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209149095 BTW, @anblanco , if you want to keep this JIRA ID for bug fixes, you may need to file a new JIRA issue for `master` branch (which has no bug). It means you need to change the PR title of the following PR. Adding a test coverage is completely different from a bug fix. - https://github.com/apache/spark/pull/55223 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4209151537 @dongjoon-hyun #55223 is not the main PR. So this is a fix targeting 3.5, 4.0, 4.1 because the master branch has already been fixed by another refactoring. Backporting the full refactoring is a bit too much so we decided to just do a simple fix for previous branches. This is the main PR for the fix, along with another 2 PRs for 4.0 and 3.5. #55223 is a separate PR to add regression tests to master branch. We want to keep the code clean for the backport. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
anblanco commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4195734880 > If @HyukjinKwon agrees, what I like to do is to backport only the fix, not the regression test. Then let's create another PR for the regression test and discuss how it should be done. Sure thing! I've gone and split the backport into a few separate PRs. - Fix PRs (backport only, no tests): - branch-4.1 (this PR): #55201 - branch-4.0: #55224 - branch-3.5: #55225 - Regression tests (master only): #55223 — adds SimpleWorkerTests subclassing WorkerTests with daemon=false, plus explicit DataFrame, UDF, and DataSource read tests All three fix PRs were red/green verified locally — unfixed builds fail with EOFException, fixed builds pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4195459736 If @HyukjinKwon agrees, what I like to do is to backport only the fix, not the regression test. Then let's create another PR for the regression test and discuss how it should be done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
HyukjinKwon commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4195303961 > Anyway, I think the fix itself is good to backport, let's first decide whether we want to backport the regression tests to branch 3.5 - 4.1 first @HyukjinKwon , then we can decide the format. SGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4194911340 My suggestion would be - keep all the worker related changes in this PR and remove the regression test. Then have a new PR for the regression test in master (only the simple worker for UDF, python data source is already tested against simple worker now). If we do want to keep the regression test for all the previous branches, just keep the simplest worker one. If we decide to keep the test in this PR (or even if we move it to another PR), let's just remove the comment - because it's actually something pretty generic that we should test - basically just simple worker. We probably will add more tests into the class in the future and we don't want to bind it to a specific spark issue. Actually I was wondering if we can just reuse the `WorkerTests` and set a config to it just like what we did for python data source simple workers. Anyway, I think the fix itself is good to backport, let's first decide whether we want to backport the regression tests to branch 3.5 - 4.1 first @HyukjinKwon , then we can decide the format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
anblanco commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4194678272 > There's probably some minor misunderstanding for the master fix. I don't believe the explicit `flush()` _in addition to_ `close()` is the key for the issue. We need an explicit `close()` to avoid the socket being closed during garbage collection. `flush()` might work as well but `close()` is semantically what we really need. I think it would be nice to confirm that a `close()` in `finally` will still fix the issue on Windows. > > Also, do we need a try ... except block for `close()` / `flush()`? > > Unfortunately, fixing it in `worker.py` is probably not enough. The other workers are for python data source which is supported since spark 4.0. They probably suffer the same issues (@anblanco if you could confirm that on Windows, it would be great! `test_python_datasource` is the test file for it). > > If I'm correct, we need to backport the fix to all those worker files too. I don't have a strong opinion about cherry-pick vs clean fix - but we need to consider the work to patch all workers, not just a single file. > > I'm not sure if we are writing new regression tests for the maintenance branch, but at least we should add regression tests on master. BTW the regression tests do not need to contain comments about the "fix". Most of the times, the test itself explains what it tests against. If it's not clear, we should comment about what regression case it tests - aka the problem, not the solution. Thank you for the detailed feedback @gaogaotiantian * `close()` vs `flush()`: Switched to `close()` to match `master`'s approach, and dropped the `try...except` wrapper to match the convention of the `master` branch. I was also able to confirm that "because `.close()` internally calls `.flush()` this also fixes the crash on Windows." * Other workers affected: Confirmed on Windows with PySpark 4.0 + Python 3.12.10 + `daemon=false` — a Python data source test hits the same `EOFException`. Updated the PR to apply `close()` to all 12 additional worker files on branch-4.1. * Test comments: Updated to describe the problem, not the solution If you'd like I can squash commit my changes for a cleaner git commit history. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
gaogaotiantian commented on PR #55201: URL: https://github.com/apache/spark/pull/55201#issuecomment-4194262210 There's probably some minor misunderstanding for the master fix. I don't believe the explicit `flush()` *in addition to* `close()` is the key for the issue. We need an explicit `close()` to avoid the socket being closed during garbage collection. `flush()` might work as well but `close()` is semantically what we really need. I think it would be nice to confirm that a `close()` in `finally` will still fix the issue on Windows. Also, do we need a try ... except block for `close()` / `flush()`? Unfortunately, fixing it in `worker.py` is probably not enough. The other workers are for python data source which is supported since spark 4.0. They probably suffer the same issues (@anblanco if you could confirm that on Windows, it would be great! `test_python_datasource` is the test file for it). If I'm correct, we need to backport the fix to all those worker files too. I don't have a strong opinion about cherry-pick vs clean fix - but we need to consider the work to patch all workers, not just a single file. I'm not sure if we are writing new regression tests for the maintenance branch, but at least we should add regression tests on master. BTW the regression tests do not need to contain comments about the "fix". Most of the times, the test itself explains what it tests against. If it's not clear, we should comment about what regression case it tests - aka the problem, not the solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
anblanco commented on code in PR #55201: URL: https://github.com/apache/spark/pull/55201#discussion_r3040931134 ## python/pyspark/worker.py: ## @@ -3420,4 +3420,16 @@ def process(): # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() -main(sock_file, sock_file) +try: +main(sock_file, sock_file) +finally: +# SPARK-53759: Flush before close to ensure all buffered data reaches +# the socket. On Python 3.12+, changed GC finalization ordering +# (https://github.com/python/cpython/issues/97922) can cause the +# underlying socket to close before BufferedRWPair flushes its write +# buffer, resulting in data loss and EOFException on the JVM side. +# This mirrors the explicit flush in daemon.py's worker() finally block. +try: +sock_file.flush() Review Comment: So I evaluated the feasibility of backporting PR #54458 (SPARK-55665) properly. At a high level, that unification was very much written to be a refactor on top of `master` - I don't think the original PR had considered the possibility of backporting to older branches. Specifically, #54458 does fix SPARK-53759 on master via its context manager's explicit close() in a finally block, but cherry-picking it to stable branches ranges from tedious to impractical — branch-4.1 requires resolving 10 import conflicts across 14 files, branch-4.0 needs the context manager rewritten for the older java_port connection pattern, and branch-3.5 is missing 10 of 14 worker files making it effectively a hand-written 4-file commit. Alternatively, this PR #55201 is a more precision try/finally fix that applies identically and cleanly to all three release branches, and can be superseded by the worker unification changes in the 4.2.x branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
anblanco commented on code in PR #55201: URL: https://github.com/apache/spark/pull/55201#discussion_r3040407302 ## python/pyspark/worker.py: ## @@ -3420,4 +3420,16 @@ def process(): # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() -main(sock_file, sock_file) +try: +main(sock_file, sock_file) +finally: +# SPARK-53759: Flush before close to ensure all buffered data reaches +# the socket. On Python 3.12+, changed GC finalization ordering +# (https://github.com/python/cpython/issues/97922) can cause the +# underlying socket to close before BufferedRWPair flushes its write +# buffer, resulting in data loss and EOFException on the JVM side. +# This mirrors the explicit flush in daemon.py's worker() finally block. +try: +sock_file.flush() Review Comment: #54458 is the full change, which was a much larger refactor. I figured that the try-finally would cherry-pick much more cleanly into the older backport branches, but I can also assess the feasibility of attempting to cherry-pick -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [SPARK-53759][PYTHON][4.1] Fix missing flush in simple-worker path [spark]
HyukjinKwon commented on code in PR #55201: URL: https://github.com/apache/spark/pull/55201#discussion_r3039092386 ## python/pyspark/worker.py: ## @@ -3420,4 +3420,16 @@ def process(): # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() -main(sock_file, sock_file) +try: +main(sock_file, sock_file) +finally: +# SPARK-53759: Flush before close to ensure all buffered data reaches +# the socket. On Python 3.12+, changed GC finalization ordering +# (https://github.com/python/cpython/issues/97922) can cause the +# underlying socket to close before BufferedRWPair flushes its write +# buffer, resulting in data loss and EOFException on the JVM side. +# This mirrors the explicit flush in daemon.py's worker() finally block. +try: +sock_file.flush() Review Comment: Is this the only code path to fix? If not, I think we should better backport the original fix to reduce the diff between branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
