This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 480b14f4b45 [SPARK-41593][FOLLOW-UP][ML] Torch distributor log 
streaming server: Avoid duplicated log to stdout redirection
480b14f4b45 is described below

commit 480b14f4b45a4e01e7e4bdea82475d4c77a6b89f
Author: Weichen Xu <weichen...@databricks.com>
AuthorDate: Thu Jun 1 16:26:49 2023 +0800

    [SPARK-41593][FOLLOW-UP][ML] Torch distributor log streaming server: Avoid 
duplicated log to stdout redirection
    
    ### What changes were proposed in this pull request?
    
    Torch distributor log streaming server: Avoid duplicated log to stdout 
redirection.
    
    In some cases (typically spark local mode), the remote tasks runs on the 
same node with spark driver,
    in this case, both torch process created by spark task and driver side 
torch distributor log streaming server redirect logs to STDOUT, then it causes 
STDOUT prints duplicate logs. This PR fixes the case.
    
    ### Why are the changes needed?
    
    In some cases (typically spark local mode), the remote tasks runs on the 
same node with spark driver,
    in this case, both torch process created by spark task and driver side 
torch distributor log streaming server redirect logs to STDOUT, then it causes 
STDOUT prints duplicate logs. This PR fixes the case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT.
    
    Closes #41404 from WeichenXu123/torch-distributor-avoid-dup-log.
    
    Authored-by: Weichen Xu <weichen...@databricks.com>
    Signed-off-by: Weichen Xu <weichen...@databricks.com>
---
 python/pyspark/ml/torch/distributor.py | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index 711f76db09b..2af92f8399f 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -462,7 +462,22 @@ class TorchDistributor(Distributor):
                 decoded = line.decode()
                 tail.append(decoded)
                 if redirect_to_stdout:
-                    sys.stdout.write(decoded)
+                    if (
+                        log_streaming_client
+                        and not log_streaming_client.failed
+                        and (
+                            log_streaming_client.sock.getsockname()[0]
+                            == log_streaming_client.sock.getpeername()[0]
+                        )
+                    ):
+                        # If log_streaming_client and log_stream_server are in 
the same
+                        # node (typical case is spark local mode),
+                        # server side will redirect the log to STDOUT,
+                        # to avoid STDOUT outputs duplication, skip redirecting
+                        # logs to STDOUT in client side.
+                        pass
+                    else:
+                        sys.stdout.write(decoded)
                 if log_streaming_client:
                     log_streaming_client.send(decoded.rstrip())
             task.wait()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to