[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18320 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123722811 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- Definitely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123719067 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- nit: `If it is NULL, there are no such children.` -> `If it is NULL, there are no any children.`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123718675 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- Oh, cool. LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123717458 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- I ran several tests for this https://github.com/apache/spark/pull/18320#discussion_r122860310 and https://github.com/apache/spark/pull/18320#discussion_r123675088 ``` vi tmp.R ``` copy and paste ```R p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) parallel:::mcexit(0L) } print("TRUE - the timeout was reached") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) ``` and ... ``` Rscript tmp.R ``` If data is not available, this should wait as much as `timeout` up to my knowledge rather than returning `NULL`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123717120 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- I have this question because you have a comment: Only the process IDs of children sent data to the parent are returned below. So I am not sure if there are no children with data sent to the parent, the returned value is NULL or other? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123716354 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- It returns `NULL` if there are no children or there are no children that have data available? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123715868 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- It becomes `NULL` if there are no children. It returns `TRUE` if there are children here. So, we always turn back to wait indefinitely only if there are no children at all. https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html > TRUE is the timeout was reached, FALSE if an error occurred (e.g., if the master process was interrupted) or an integer vector of process IDs with children that have data available, or NULL if there are no children. Would this answer your question (or did I maybe misunderstand) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123714768 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,55 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of children sent data to the parent are returned below. The children + # send a custom exit code to the parent after being exited and the parent tries + # to terminate them only if they sent the exit code. + children <- parallel:::selectChildren(timeout = 0) + + if (is.integer(children)) { +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is the exit code that indicates an exited child. +if (unserialize(data) == exitCode) { + # If so, we terminate this child. + tools::pskill(child, tools::SIGUSR1) +} + } +}) + } else if (is.null(children)) { +# If it is NULL, there are no such children. Waits indefinitely for a socket connecion. +selectTimeout <- NULL --- End diff -- One path described above is: > 1. Every second if any socket connection is not available and if there are child workers running. So after forking succeeded and we want to do check per second, but after one second and if no children sent data, we turn back immediately to wait indefinitely. Doesn't it mean we don't actually do the logic every second but just for the first second? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123701851 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { --- End diff -- I also tested ```r kill <- function(children) { lapply(children, function(child) { data <- parallel:::readChild(child) if (is.raw(data)) { if (unserialize(data) == -1) { print(paste("child PID:", child, "and parent will kill given:", unserialize(data))) tools::pskill(child, tools::SIGUSR1) } else { print(paste("child PID:", child, "and sent a data:", unserialize(data))) } } else { print(paste("child PID:", child, "and sent a PID:", data)) } }) } for(i in 0:1000) { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { # Sys.sleep(3L) # Send no data parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { ## Sys.sleep(3L) # Send custom data parallel:::sendMaster("arbitrary") parallel:::sendMaster(123) parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { ### Sys.sleep(3L) # Send explicit singal. parallel:::mcexit(0L, send = -1) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { ### Sys.sleep(10L) # Check if `readChild` waits parallel:::mcexit(0L, send = -1) } } } } c <- parallel:::selectChildren(timeout = 0) if (!is.null(c)) { print(paste("Killing", c)) invisible(kill(parallel:::selectChildren(timeout = 0))) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123700234 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { --- End diff -- With the change above, it printed: ``` [1] "Wait for 4 seconds to test the last child ..." [1] "child PID: 86866 and parent will kill given: -1" [1] "child PID: 86865 and parent will kill given: -1" [1] "child PID: 86864 and sent a data: arbitrary" [1] "child PID: 86863 and sent a PID: 86863" [1] "Wait for 7 seconds more to test the last child ..." [1] "child PID: 86864 and sent a data: 123" ``` It looks correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123688218 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { --- End diff -- Definitely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123681647 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) --- End diff -- right, but in the odd case that JVM died before making the socket connection, this R process will wait forever? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123686953 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,54 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. --- End diff -- nit: I think the comment on L59 and L63 can be updated to match more closely to the new behavior - that we are checking proper exit code is sent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123686706 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,54 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is -1 that indecides an exited child. --- End diff -- `-1` to exit code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123687096 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,54 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + +# Exit code that children send to the parent to indicate they exited. +exitCode <- 1 + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { + # This data should be raw bytes if any data was sent from this child. + # Otherwise, this returns the PID. + data <- parallel:::readChild(child) + if (is.raw(data)) { +# This checks if the data from this child is -1 that indecides an exited child. --- End diff -- `indecides` -> `indicates` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123682838 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { --- End diff -- cool could you also test without these sleep (trying to see if there's any race condition) ``` p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { # Sys.sleep(3L) # Send no data parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { ## Sys.sleep(3L) # Send custom data parallel:::sendMaster("arbitrary") parallel:::sendMaster(123) parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { ### Sys.sleep(3L) # Send explicit singal. parallel:::mcexit(0L, send = -1) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { ### Sys.sleep(10L) # Check if `readChild` waits parallel:::mcexit(0L, send = -1) } } } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123675152 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { --- End diff -- Also, I referred the code bit in https://github.com/mllg/batchtools/blob/master/R/clusterFunctionsMulticore.R#L8-L18. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123675088 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,51 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them +# by setting SIGUSR1. +lapply(children, function(child) { --- End diff -- I tested this logics as below: ``` vi tmp.R ``` copied and pasted ```r kill <- function(children) { lapply(children, function(child) { data <- parallel:::readChild(child) if (is.raw(data)) { if (unserialize(data) == -1) { print(paste("child PID:", child, "and parent will kill given:", unserialize(data))) tools::pskill(child, tools::SIGUSR1) } else { print(paste("child PID:", child, "and sent a data:", unserialize(data))) } } else { print(paste("child PID:", child, "and sent a PID:", data)) } }) } p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) # Send no data parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) # Send custom data parallel:::sendMaster("arbitrary") parallel:::sendMaster(123) parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) # Send explicit singal. parallel:::mcexit(0L, send = -1) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(10L) # Check if `readChild` waits parallel:::mcexit(0L, send = -1) } } } } print("Wait for 4 seconds to test the last child ...") Sys.sleep(4L) invisible(kill(parallel:::selectChildren(timeout = 0))) print("Wait for 7 seconds more to test the last child ...") Sys.sleep(7L) invisible(kill(parallel:::selectChildren(timeout = 0))) ``` ``` [1] "Wait for 4 seconds to test the last child ..." [1] "child PID: 7405 and parent will kill given: -1" [1] "child PID: 7404 and sent a data: arbitrary" [1] "child PID: 7403 and sent a PID: 7403" [1] "Wait for 7 seconds more to test the last child ..." [1] "child PID: 7406 and parent will kill given: -1" [1] "child PID: 7404 and sent a data: 123" ``` and ran ``` Rscript tmp.R ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123633470 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them --- End diff -- Sure, let me take a look and be back soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123564501 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) --- End diff -- Its less of waiting indefinitely but its waiting for input from the JVM socket -- I think this is the right behavior by design --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123569536 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them --- End diff -- Hmm @felixcheung this is an good point - The thing is I think what @HyukjinKwon says might be valid given the current code path -- i.e. the child processes do not write anything back to the master process until exit. However that seems pretty fragile going forward ? What if we explicitly used `sendMaster` to send back an exited signal to the master ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123443750 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -47,9 +79,11 @@ while (TRUE) { close(inputCon) --- End diff -- Added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123440068 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them --- End diff -- I think [this line](https://github.com/s-u/multicore/blob/e9d9bf21e6cf08e24cfe54d762379b4fa923765b/src/fork.c#L440) checks if any data from children is available via file descriptors (pipes I believe). The child do not send any data back to the parent (via `parallel:::sendMaster` API and etc.). So, `FD_ISSET` will only be true on the close of the pipe (as this is a event to the pipe), which I guess happens when `exit()` is called properly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123417186 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -47,9 +79,11 @@ while (TRUE) { close(inputCon) --- End diff -- please add comment here to say "# reach here because this is a child process" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123416154 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) --- End diff -- hmm, this is pre-existing behavior, but generally I think waiting indefinitely for anything would be rather dangerous. probably would be good to follow up separately --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123416771 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them --- End diff -- If I understanding correctly, children (from `parallel:::selectChildren()`) as integer only indicates the list of fork/child process. But it does not indicate the child process "exited"? When we are in a loop checking every second (ie. selectTimeout == 1), it sounds to me like socketSelect could return in one of the following ways: - socket available for reading (ready == TRUE) - socket not available for reading, it's dead or exiting (ready == FALSE) - socket not available for reading, but it's not ready to connect because say things are running slow or the host is busy etc, and it has effectively been just timed out since selectTimeout (ready == FALSE) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r123419616 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,40 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +# Waits indefinitely for a socket connecion by default. +selectTimeout <- NULL + while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = selectTimeout) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it attempts to send a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available and if there are child workers + # running. + # 2. Right after a socket connection is available. + # + # In other words, the parent attempts to send the signal to the children every second if + # any worker is running or right before launching other worker children from the following + # new socket connection. + + # Only the process IDs of exited children are returned and the termination is attempted below. + children <- parallel:::selectChildren(timeout = 0) + if (is.integer(children)) { +# If it is PIDs, there are workers exited but not terminated. Attempts to terminate them --- End diff -- right, I see your reference here https://github.com/apache/spark/pull/18320#discussion_r122639738 but I'm not 100% getting it when looking at the source code https://github.com/s-u/multicore/blob/e9d9bf21e6cf08e24cfe54d762379b4fa923765b/src/fork.c#L361 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
GitHub user HyukjinKwon reopened a pull request: https://github.com/apache/spark/pull/18320 [SPARK-21093][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak ## What changes were proposed in this pull request? `mcfork` in R looks opening a pipe ahead but the existing logic does not properly close it when it is executed hot. This leads to the failure of more forking due to the limit for number of files open. This hot execution looks particularly for `gapply`/`gapplyCollect`. For unknown reason, this happens more easily in CentOS and could be reproduced in Mac too. All the details are described in https://issues.apache.org/jira/browse/SPARK-21093 This PR proposes simply to terminate R's worker processes in the parent of R's daemon to prevent a leak. ## How was this patch tested? I ran the codes below on both CentOS and Mac with that configuration disabled/enabled. ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` Also, now it passes R tests on CentOS as below: ``` SparkSQL functions: Spark package found in SPARK_HOME: .../spark .. .. .. .. .. ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-21093 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18320.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18320 commit 6e57ed2931afc5aec8c4b4bef72c157abcb68c46 Author: hyukjinkwonDate: 2017-06-16T02:37:53Z Terminates forked processed in the parent process commit 4eadafe3f009b1c70956c08c99302c1da34db6d4 Author: hyukjinkwon Date: 2017-06-17T09:21:37Z Fix typo (renaming missed) commit 18b3ee9a66df40658074511558f0cd36fc102df7 Author: hyukjinkwon Date: 2017-06-17T10:54:57Z Rename x to c in lapply commit 72ab1f2e8cafa1d5249a09279825444e3ca38b39 Author: hyukjinkwon Date: 2017-06-19T12:21:08Z Update comments to describe the behaviour change commit 6cba54c243123d25f479363d3dfd7eb92bb25599 Author: hyukjinkwon Date: 2017-06-20T01:02:26Z Do not check every second if there is no worker running commit 4954008884ff02a9eae9ea50586e86e8923fc593 Author: hyukjinkwon Date: 2017-06-20T09:04:03Z Address comment commit f3f57e46868e66b8f50268910c1eff494638059d Author: hyukjinkwon Date: 2017-06-20T09:30:56Z Fix comments commit 04bb37a6d8d4387365f6d46cb8e2c6fbe912351d Author: hyukjinkwon Date: 2017-06-20T09:32:29Z Fix comments commit d6f0ff275abd3b7641210427eea955c9f0ea8d86 Author: hyukjinkwon Date: 2017-06-20T09:39:21Z Add more comments commit 8b48274dc565dc5c6722e983c55494b0067bda72 Author: Hyukjin Kwon Date: 2017-06-20T10:00:05Z Fix a typo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon closed the pull request at: https://github.com/apache/spark/pull/18320 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122879650 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +selectTimeout <- function() { + if (is.null(parallel:::selectChildren(timeout = 0))) { +# Wait a socket connection indefinitely if there are no workers running. +NULL --- End diff -- Sure, let me check this out soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122875289 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +selectTimeout <- function() { + if (is.null(parallel:::selectChildren(timeout = 0))) { +# Wait a socket connection indefinitely if there are no workers running. +NULL --- End diff -- Nice - this is in line with what I was thinking. One thing: Can we maintain some state in our own code to avoid calling `select` twice ? What I mean is if `finishedChildren` was `NULL` (based on [1]) then we set a flag saying `timeout <- NULL` -- next time we fork some children we set this back to `1`. Will that work or am I missing something ? [1] https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122861395 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +selectTimeout <- function() { + if (is.null(parallel:::selectChildren(timeout = 0))) { --- End diff -- Multiple children tests can be done as below: copy and paste this instead ```r p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) parallel:::mcexit(0L) } else { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) parallel:::mcexit(0L) } } print("TRUE - the timeout was reached") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) Sys.sleep(4L) print("PID - child is exited (but not terminated)") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) invisible(lapply(parallel:::selectChildren(timeout = 0), function(x) { tools::pskill(x, tools::SIGUSR1) })) Sys.sleep(1L) print("NULL - if there are no children") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122860937 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +selectTimeout <- function() { + if (is.null(parallel:::selectChildren(timeout = 0))) { --- End diff -- And ... just in case, these are can be checked via both commands as below: ``` watch -n 0.01 "ps -fe | grep /exec/R" ``` checks forked processes. ``` watch -n 0.01 "lsof -c R | wc -l" ``` check open files (and file descriptors & pipes). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122860370 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +selectTimeout <- function() { + if (is.null(parallel:::selectChildren(timeout = 0))) { +# Wait a socket connection indefinitely if there are no workers running. +NULL --- End diff -- https://stat.ethz.ch/R-manual/R-devel/library/base/html/socketSelect.html > NULL means wait indefinitely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122860310 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -30,8 +30,42 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) +selectTimeout <- function() { + if (is.null(parallel:::selectChildren(timeout = 0))) { --- End diff -- This can be tested as below: ```r p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) parallel:::mcexit(0L) } print("TRUE - the timeout was reached") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) Sys.sleep(4L) print("PID - child is exited (but not terminated)") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) invisible(lapply(parallel:::selectChildren(timeout = 0), function(x) { tools::pskill(x, tools::SIGUSR1) })) Sys.sleep(1L) print("NULL - if there are no children") print("parallel:::selectChildren(timeout = 0):") print(parallel:::selectChildren(timeout = 0)) ``` Per the documentation - https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html > selectChildren returns `TRUE` is the timeout was reached, `FALSE` if an error occurred (e.g., if the master process was interrupted) or an integer vector of process IDs with children that have data available, or` NULL` if there are no children. I also manually built this and checked if it really wait indefinitely with printing out. via running ```r df <- createDataFrame(list(list(1L, 1, "1", 0.1)), c("a", "b", "c", "d")) collect(gapply(df, "a", function(key, x) { x }, schema(df))) collect(gapply(df, "a", function(key, x) { x }, schema(df))) ... # 30 times ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122855545 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) --- End diff -- BTW, the old behaviour was simply to hang and wait there indefinitely. https://stat.ethz.ch/R-manual/R-devel/library/base/html/socketSelect.html > numeric or NULL. Time in seconds to wait for a socket to become available; NULL means wait indefinitely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122848957 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) --- End diff -- Yea, I got your point and was taking a look. I think probably feasible via `parallel:::children`. I will give a shot to deal with this here and be back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122847863 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) --- End diff -- To make my point a bit more clear, if there are no workers running, we can block on this `select` and dont need the one second timeout in that case ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122843971 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,30 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it sends a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available. + # 2. Right after a socket connection is available. + # + # In other words, the parent sends the signal to children every second or right before + # launching other worker children from the following new socket connection. + # + # Only the process IDs of exited children are returned and the termination is attempted below. + finishedChildren <- parallel:::selectChildren(timeout = 0) --- End diff -- This does not block at all. I tested this - https://github.com/apache/spark/pull/18320#discussion_r122605437 for sure. Let me double check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122804718 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,30 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it sends a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available. + # 2. Right after a socket connection is available. + # + # In other words, the parent sends the signal to children every second or right before + # launching other worker children from the following new socket connection. + # + # Only the process IDs of exited children are returned and the termination is attempted below. + finishedChildren <- parallel:::selectChildren(timeout = 0) --- End diff -- Does the timeout of `0` not wait at all or does it block until it gets all the results ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122803981 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) --- End diff -- @HyukjinKwon Do we know what the old behavior was doing ? I'm thinking if we would spend more time in the loop below now and be less responsive to the `inputCon`. One thing -- Can we control the timeout based on whether there are any children running or not ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122695789 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,30 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Note that the children should be terminated in the parent. If each child terminates + # itself, it appears that the resource is not released properly, that causes an unexpected + # termination of this daemon due to, for example, running out of file descriptors + # (see SPARK-21093). Therefore, the current implementation tries to retrieve children + # that are exited (but not terminated) and then sends a kill signal to terminate them properly + # in the parent. + # + # There are two paths that it sends a signal to terminate the children in the parent. + # + # 1. Every second if any socket connection is not available. + # 2. Right after a socket connection is available. + # + # In other words, the parent sends the signal to children every second or right before + # launching other worker children from the following new socket connection. + # + # Only the process IDs of exited children are returned and the termination is attempted below. + finishedChildren <- parallel:::selectChildren(timeout = 0) --- End diff -- This is 0 by default but I added to prevent conversion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122687051 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- @felixcheung, I tested with the change below: ```diff port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) +Sys.sleep(5L) inputCon <- socketConnection( port = port, blocking = TRUE, open = "rb", timeout = connectionTimeout) outputCon <- socketConnection( ``` It looks fine. Does this deal with your concern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122639738 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- Definitely for comments. Maybe I missed your point. Children will only return their PID on exit and `parallel:::selectChildren()` will only return children PIDs that finished they work and called `parallel:::mcexit(0L)` (related test was done https://github.com/apache/spark/pull/18320#discussion_r122605437) up to my knowledge. So, even if connecting to JVM is delayed in `worker.R` and `RRunner.scala`, it won't matter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122630135 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- ah got it. could you add some comment in the code to document the behavior? also I wonder if delay in connecting back to the JVM will now cause the code to abort prematurely? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122609234 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- The related test was performed here - https://github.com/apache/spark/pull/18320#discussion_r122605437 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122609177 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- Ah, yes. It looks confusing. Probably, let me add more comments around here. I intended to do ... A: When data is arrived in `daemon.R`: - `ready` is `TRUE` and terminates children, `worker.R`, if they finished their works. - Launch other `worker.R` children. B: When data is not arrived in `daemon.R`: - `ready` is `FALSE` and terminates children, `worker.R`, if they finished their works, every second. If we do this within `if (ready) {`, the child processes will remain until A case happens. Forked children do some jobs and if the parent checks if any is finished right after forking by `parallel:::selectChildren()` will probably not return the finished children PIDs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122608540 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- why not do this under `if (ready) {` similar to before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122606765 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -47,8 +55,6 @@ while (TRUE) { close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) try(source(script)) - # Set SIGUSR1 so that child can exit - tools::pskill(Sys.getpid(), tools::SIGUSR1) parallel:::mcexit(0L) --- End diff -- BTW, up to my knowledge, this should work alone, at least in C (< I didn't test). Even if it was a bug in R and is fixed in the future, I guess the current logics would still work with the current status. ```R arbitrary <- tools::pskill(arbitrary, tools::SIGUSR1) ``` https://stat.ethz.ch/R-manual/R-devel/library/tools/html/pskill.html > it silently ignores invalid values of its arguments, including zero or negative pids. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122606530 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -47,8 +55,6 @@ while (TRUE) { close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) try(source(script)) - # Set SIGUSR1 so that child can exit - tools::pskill(Sys.getpid(), tools::SIGUSR1) parallel:::mcexit(0L) --- End diff -- `mcexit` alone does not terminate the child. With terminal A: ```R R ``` With terminal B: ```bash watch -n 0.01 "ps -fe | grep /bin/exec/R" ``` with terminal A: ```R for(i in 0:100) { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { parallel:::mcexit(0L) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122605437 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- This can be tested as below: ```bash vi tmp.R ``` copy and paste ```R p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { Sys.sleep(3L) parallel:::mcexit(0L) } print(parallel:::selectChildren()) Sys.sleep(4L) print(parallel:::selectChildren()) ``` This prints ```bash [1] TRUE [1] 2505 ``` For return values, please refer https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/children.html. For testing resource leak, this can be tested as below: ```R for(i in 0:10) { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { parallel:::mcexit(0L) } exitChildren <- parallel:::selectChildren() if (is.integer(exitChildren)) { lapply(exitChildren, function(x) { tools::pskill(x, tools::SIGUSR1) }) } } ``` The original code below: ```R for(i in 0:10) { p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { tools::pskill(Sys.getpid(), tools::SIGUSR1) parallel:::mcexit(0L) } } ``` will probably complain in `mcfork` due to the lack of resources (e.g., ran out of file descriptors). Actual number can be checked via `watch -n 0.01 "lsof -c R | wc -l"` in another terminal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122605188 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) --- End diff -- The logic here can be tested as below: With a terminal (terminal A) running the command as below: ```bash nc -l ``` with another terminal (terminal B) running the codes below in R shell. ```R inputCon <- socketConnection(port = "", open = "rb", blocking = TRUE) print(socketSelect(list(inputCon), timeout = 1)) ``` This prints `FALSE` as it does not have any data first. In terminal A, if any character is typed first, this returns `TRUE` in terminal B with another execution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122564740 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) + + # Terminate R workers in the parent process. + finishedChildren <- parallel:::selectChildren() --- End diff -- `timeout` for `selectChildren` looks 0 by default. So, this does not look hanging here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18320: [SPARK-21093][R] Terminate R's worker processes i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18320#discussion_r122564686 --- Diff: R/pkg/inst/worker/daemon.R --- @@ -31,7 +31,15 @@ inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) while (TRUE) { - ready <- socketSelect(list(inputCon)) + ready <- socketSelect(list(inputCon), timeout = 1) --- End diff -- So, I intended to terminates the children for every second _or_ before launching other workers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org