[GitHub] spark issue #23209: [SPARK-26256][K8s] Fix labels for pod deletion
Github user skonto commented on the issue: https://github.com/apache/spark/pull/23209 @srowen I resolved the conflicts feel free to merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23209: [SPARK-26256][K8s] Fix labels for pod deletion
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/23209 [SPARK-26256][K8s] Fix labels for pod deletion ## What changes were proposed in this pull request? Adds proper labels when deleting executor pods. ## How was this patch tested? Manually with tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark fix-deletion-labels Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23209 commit c40f4df5a8ae4250927a0ce3855f6014dcb4e66e Author: Stavros Kontopoulos Date: 2018-12-03T15:24:42Z fix labels for pod deletion --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23136: [SPARK-25515][K8s] Adds a config option to keep executor...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/23136 @liyinan926 updated the PR,here is the [jira](https://issues.apache.org/jira/browse/SPARK-26256) for the bug wil create a PR for it shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r237643173 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator( s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + " deleted but the application missed the deletion event.") -Utils.tryLogNonFatalError { - kubernetesClient -.pods() -.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) -.delete() + +if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { +kubernetesClient + .pods() + .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) --- End diff -- @liyinan926 ok. In general it is not a big issue but for tracking purposes it would help. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r237642388 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -30,7 +30,7 @@ import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.util.Utils private[spark] class ExecutorPodsLifecycleManager( -conf: SparkConf, +val conf: SparkConf, --- End diff -- @onursatici I think its a small bug fix that can be include dhere, I can update the PR description if that helps. I thought of creating a new one but exposing that value was easy for testing purposes. I don't expect that to be a problem unless someone tries to modify SparkConf at the backend when he shouldnt. I dont thinkg it is not a big concern. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23136: [SPARK-25515][K8s] Adds a config option to keep executor...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/23136 tests failed but not due to this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23136: [SPARK-25515][K8s] Adds a config option to keep executor...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/23136 @mccheah gentle ping. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23136: [SPARK-25515][K8s] Adds a config option to keep executor...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/23136 Fixed that failing test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r236488306 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator( s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + " deleted but the application missed the deletion event.") -Utils.tryLogNonFatalError { - kubernetesClient -.pods() -.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) -.delete() + +if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { +kubernetesClient + .pods() + .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) --- End diff -- I just run multiple jobs in parallel: ``` NAMEREADY STATUS RESTARTS AGE spark-pi-1543281035622-driver 1/1 Running 0 2m spark-pi-1543281035622-exec-1 1/1 Running 0 1m spark-pi-1543281098418-driver 0/1 Completed 0 1m spark-pi-1543281107591-driver 0/1 Completed 0 57s ``` The long running was not terminated... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r236485835 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator( s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + " deleted but the application missed the deletion event.") -Utils.tryLogNonFatalError { - kubernetesClient -.pods() -.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) -.delete() + +if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { +kubernetesClient + .pods() + .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) --- End diff -- Correct here is a sample output: ```Labels:spark-app-selector=spark-application-1543242814683 spark-exec-id=2 spark-role=executor ``` Weird that didnt see one job interfering with the other before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r236457049 --- Diff: docs/running-on-kubernetes.md --- @@ -926,6 +926,13 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml` + + spark.kubernetes.deleteExecutors --- End diff -- No problem I can add change to that seems more readable if we remove driver pods as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r236456591 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator( s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + " deleted but the application missed the deletion event.") -Utils.tryLogNonFatalError { - kubernetesClient -.pods() -.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) -.delete() + +if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { +kubernetesClient + .pods() + .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) --- End diff -- Ok let's make that configurable too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r236432458 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala --- @@ -100,6 +101,18 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) } + test("When an executor reaches error states immediately and the user has set the flag" + --- End diff -- Yeah that was a draft name, I can change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23136: [SPARK-25515][K8s] Adds a config option to keep executor...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/23136 @erikerlandson @mccheah pls review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/23136#discussion_r236089683 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -82,11 +85,13 @@ private[spark] class KubernetesClusterSchedulerBackend( pollEvents.stop() } -Utils.tryLogNonFatalError { - kubernetesClient.pods() -.withLabel(SPARK_APP_ID_LABEL, applicationId()) -.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) -.delete() +if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { +kubernetesClient.pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .delete() + } --- End diff -- safest scenario as the backend stops all executors before pod deletion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23136: [SPARK-25515][K8s] Adds a config option to keep e...
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/23136 [SPARK-25515][K8s] Adds a config option to keep executor pods for debuging ## What changes were proposed in this pull request? Keeps K8s executor resources present if case of failure or normal termination. Introduces a new boolean config option: `spark.kubernetes.deleteExecutors`, with default value set to true. The idea is to update Spark K8s backend structures but leave the resources around. The assumption is that since entries are not removed from the `removedExecutorsCache` we are immune to updates that refer to the the executor resources previously removed. The only delete operation not touched is the one in the `doKillExecutors` method. Reason is right now we dont support [blacklisting](https://issues.apache.org/jira/browse/SPARK-23485) and dynamic allocation with Spark on K8s. In both cases in the future we might want to handle these scenarios although its more complicated. ## How was this patch tested? Manually by running a Spark job and verifying pods are not deleted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark keep_pods Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23136 commit 5cdca3167e2f8acb84a23f9c64e5c3bc524f04ac Author: Stavros Kontopoulos Date: 2018-11-25T20:18:42Z add config option to keep executor pods --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18784: [SPARK-21559][Mesos] remove mesos fine-grained mode
Github user skonto commented on the issue: https://github.com/apache/spark/pull/18784 @imaxxs @rxin I think its a good time to remove this, I will update the PR if you are all ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r233216872 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -193,6 +194,36 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = +// Split java.version on non-digit chars: +if (System.getProperty("java.version").split("\\D+").head.toInt < 9) { + // scalastyle:off classforname + val cleanerMethod = Class.forName("sun.misc.Cleaner").getMethod("clean") + // scalastyle:on classforname + (buffer: DirectBuffer) => { +// Careful to avoid the return type of .cleaner(), which changes with JDK +val cleaner: AnyRef = buffer.cleaner() +if (cleaner != null) { + cleanerMethod.invoke(cleaner) +} + } +} else { + // scalastyle:off classforname + val cleanerMethod = +Class.forName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer]) --- End diff -- ok, in general I cant think of something failing at the moment, just wanted to provide some feedback that this mixes strategies. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r233166698 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -193,6 +194,36 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = +// Split java.version on non-digit chars: +if (System.getProperty("java.version").split("\\D+").head.toInt < 9) { --- End diff -- It is a source of inspiration though for how to parse the string of that property even if you dont rely on this dependency. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r233002403 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -193,6 +194,36 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = +// Split java.version on non-digit chars: +if (System.getProperty("java.version").split("\\D+").head.toInt < 9) { + // scalastyle:off classforname + val cleanerMethod = Class.forName("sun.misc.Cleaner").getMethod("clean") + // scalastyle:on classforname + (buffer: DirectBuffer) => { +// Careful to avoid the return type of .cleaner(), which changes with JDK +val cleaner: AnyRef = buffer.cleaner() +if (cleaner != null) { + cleanerMethod.invoke(cleaner) +} + } +} else { + // scalastyle:off classforname + val cleanerMethod = +Class.forName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer]) --- End diff -- Probably it is safer not to mix strategies (https://www.javaworld.com/article/2077344/core-java/find-a-way-out-of-the-classloader-maze.html). Utils forName method first checks the thread context class loader and from what I see this method is called in numerous places. Spark creates custom classloaders AFAIK so the question is from which classloader chain you want this class loading to happen. I think both should work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r232980913 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -193,6 +194,36 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = +// Split java.version on non-digit chars: +if (System.getProperty("java.version").split("\\D+").head.toInt < 9) { --- End diff -- I think so how about then: import org.apache.commons.lang3.JavaVersion SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9) We include that package anyway, and that method is based on that property: https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/JavaVersion.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r232852693 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java --- @@ -67,6 +67,59 @@ unaligned = _unaligned; } + // Access fields and constructors once and store them, for performance: + + private static final Constructor DBB_CONSTRUCTOR; + private static final Field DBB_CLEANER_FIELD; + static { +try { + Class cls = Class.forName("java.nio.DirectByteBuffer"); + Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE); + constructor.setAccessible(true); + Field cleanerField = cls.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + DBB_CONSTRUCTOR = constructor; + DBB_CLEANER_FIELD = cleanerField; +} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) { + throw new IllegalStateException(e); +} + } + + private static final Method CLEANER_CREATE_METHOD; + static { +// The implementation of Cleaner changed from JDK 8 to 9 +int majorVersion = Integer.parseInt(System.getProperty("java.version").split("\\.")[0]); --- End diff -- How about using java.specification.version and parse it as double? Then check if greater than 2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r232847703 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -193,6 +194,36 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = +// Split java.version on non-digit chars: +if (System.getProperty("java.version").split("\\D+").head.toInt < 9) { --- End diff -- Is this java detection method used elsewhere? Maybe other parts of the codebase could benefit from this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r232847435 --- Diff: core/src/main/scala/org/apache/spark/storage/StorageUtils.scala --- @@ -193,6 +194,36 @@ private[spark] class StorageStatus( /** Helper methods for storage-related objects. */ private[spark] object StorageUtils extends Logging { + + // In Java 8, the type of DirectBuffer.cleaner() was sun.misc.Cleaner, and it was possible + // to access the method sun.misc.Cleaner.clean() to invoke it. The type changed to + // jdk.internal.ref.Cleaner in later JDKs, and the .clean() method is not accessible even with + // reflection. However sun.misc.Unsafe added a invokeCleaner() method in JDK 9+ and this is + // still accessible with reflection. + private val bufferCleaner: DirectBuffer => Unit = +// Split java.version on non-digit chars: +if (System.getProperty("java.version").split("\\D+").head.toInt < 9) { + // scalastyle:off classforname + val cleanerMethod = Class.forName("sun.misc.Cleaner").getMethod("clean") + // scalastyle:on classforname + (buffer: DirectBuffer) => { +// Careful to avoid the return type of .cleaner(), which changes with JDK +val cleaner: AnyRef = buffer.cleaner() +if (cleaner != null) { + cleanerMethod.invoke(cleaner) +} + } +} else { + // scalastyle:off classforname + val cleanerMethod = +Class.forName("sun.misc.Unsafe").getMethod("invokeCleaner", classOf[ByteBuffer]) --- End diff -- Should we use Utils forname method here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22993: [SPARK-24421][BUILD][CORE] Accessing sun.misc.Cle...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22993#discussion_r232846926 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java --- @@ -67,6 +67,60 @@ unaligned = _unaligned; } + // Access fields and constructors once and store them, for performance: + + private static final Constructor DBB_CONSTRUCTOR; + private static final Field DBB_CLEANER_FIELD; + static { +try { + Class cls = Class.forName("java.nio.DirectByteBuffer"); + Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE); + constructor.setAccessible(true); + Field cleanerField = cls.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + DBB_CONSTRUCTOR = constructor; + DBB_CLEANER_FIELD = cleanerField; +} catch (ClassNotFoundException | NoSuchMethodException | NoSuchFieldException e) { + throw new IllegalStateException(e); +} + } + + private static final Method CLEANER_CREATE_METHOD; + static { +// The implementation of Cleaner changed from JDK 8 to 9 --- End diff -- is this same in java 11? Why do we need to reference java 9, its dead right and it will have no LTS AFAIK. Shouldnt we only support LTS? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22915: [SPARK-25825][K8S][WIP] Enable token renewal for ...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22915#discussion_r232113698 --- Diff: docs/security.md --- @@ -798,6 +782,50 @@ achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing Co local:///opt/spark/examples/jars/spark-examples_.jar \ ``` + +## Long-Running Applications + +Long-running applications may run into issues if their run time exceeds the maximum delegation +token lifetime configured in services it needs to access. + +Spark supports automatically creating new tokens for these applications when running in YARN, Mesos, and Kubernetes modes. +If one wishes to launch the renewal thread in the Driver, Kerberos credentials need to be provided to the Spark application +via the `spark-submit` command, using the `--principal` and `--keytab` parameters. + +The provided keytab will be copied over to the machine running the Application Master via the Hadoop +Distributed Cache. For this reason, it's strongly recommended that both YARN and HDFS be secured +with encryption, at least. + +The Kerberos login will be periodically renewed using the provided credentials, and new delegation +tokens for supported will be created. + + Long-Running Kerberos in Kubernetes + +This section addresses the additional feature added uniquely to Kubernetes. If you are running an external token service --- End diff -- The API should be defined first but It would be good to have a reference implementation of this service targeting K8s, that could be run out of the box. End-user wants the whole thing most of the time. This should be part of Spark since this way we make sure we keep things updated all the time. I like the server implementation in the fork and btw this functionality should be backported to the spark operator project as well. Actually an operator on K8s is for managing these kind of things and I am wondering if it should have been part of the Spark project, like the mesos dispatcher is. On the other hand, the unfortunate thing is that ideally resource managers should be separate projects but the separation never happened. I am not sure how a cli tool would help here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22931: [SPARK-25930][K8s] Fix scala string detection in k8s tes...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22931 @srowen any update on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22931: [SPARK-25930][K8s] Fix scala string detection in k8s tes...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22931 @srowen I guess yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22931: [SPARK-25930][K8s] Fix scala string detection in k8s tes...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22931 @vanzin modified it. Much better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22931: [SPARK-25930][K8s] Fix scala string detection in k8s tes...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22931 @vanzin ok let me try that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22931: [SPARK-25930][K8s] Fix scala string detection in k8s tes...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22931 @srowen pls review. We missed that, kind of tricky. Making mvn silent is not that easy from what I checked, now I think this is robust enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22931: [SPARK-25930][K8s] Fix scala string detection in ...
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/22931 [SPARK-25930][K8s] Fix scala string detection in k8s tests ## What changes were proposed in this pull request? - Issue is described in SPARK-25930. Since we rely on the output pick always the last line which contains the wanted value. ## How was this patch tested? manually. rm -rf ~/.m2 and then run the tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark fix_scala_detection Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22931 commit 913b4c405ed618f13dac7f76dbbab220b3e0697c Author: Stavros Kontopoulos Date: 2018-11-02T19:46:10Z fix scala string detection in k8s tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22838: [SPARK-25835][K8s] Create kubernetes-tests profil...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22838#discussion_r228494668 --- Diff: resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --- @@ -103,4 +104,4 @@ then properties=( ${properties[@]} -Dtest.exclude.tags=$EXCLUDE_TAGS ) fi -$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pkubernetes ${properties[@]} +$TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -Pkubernetes-integration-tests ${properties[@]} --- End diff -- Yes it is needed since kubernetes is not a module of the tests profile. I updated the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22838: [SPARK-25835][K8s] Create kubernetes-tests profile and u...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22838 Tests failing are unrelated, some flaky hive test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22838: [SPARK-25835][K8s] Create kubernetes-tests profile and u...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22838 @srowen I think now we are aligned. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22829: [SPARK-25836][BUILD][K8S] For now disable kubernetes-int...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22829 ok seems this to be closed. I will copy it to the other PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22838: [SPARK-25835][K8s] Create kubernetes-tests profil...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22838#discussion_r228318094 --- Diff: pom.xml --- @@ -2654,6 +2654,16 @@ kubernetes + +resource-managers/kubernetes/core + + + + + kubernetes-tests + + false --- End diff -- I modified the profile to `Pkubernetes-integration-tests` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22838: [SPARK-25835][K8s] Create kubernetes-tests profil...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22838#discussion_r228317925 --- Diff: pom.xml --- @@ -2654,6 +2654,16 @@ kubernetes + +resource-managers/kubernetes/core + + + + + kubernetes-tests + + false --- End diff -- @srowen I just removed it... Yes I know they are just wanted to make it explicit, anyway. Let's merge both ;) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22838: [SPARK-25835][K8s] Create kubernetes-tests profil...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22838#discussion_r228316721 --- Diff: pom.xml --- @@ -2654,6 +2654,16 @@ kubernetes + +resource-managers/kubernetes/core + + + + + kubernetes-tests + + false + --- End diff -- @srowen @dongjoon-hyun I will use `kubernetes-integration-tests` as the profile from the other pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22838: [SPARK-25835][K8s] fix issues with k8s tests
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22838#discussion_r228314627 --- Diff: pom.xml --- @@ -2654,6 +2654,16 @@ kubernetes + +resource-managers/kubernetes/core + + + + + kubernetes-tests + + false + --- End diff -- @dongjoon-hyun we had a discussion on the list, I thought I would have a look at it. Anyway I can remove that part is it ok? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22838: [SPARK-25835][K8s] fix issues with k8s tests
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22838 @srowen pls review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22838: [SPARK-25835][K8s] fix issues with k8s tests
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/22838 [SPARK-25835][K8s] fix issues with k8s tests ## What changes were proposed in this pull request? - Fixes the scala version propagation issue. - Disables the tests under the k8s profile, now we will run them manually. Adds a test specific one otherwise tests will not run if we just remove the module from the kubernetes profile. ## How was this patch tested? Manually by running the tests with different versions of scala. You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark propagate-scala2.12 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22838.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22838 commit 02894561ba0b3e89d282958ae34087edef857c6f Author: Stavros Kontopoulos Date: 2018-10-25T19:15:45Z fix issues with k8s tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22820: [SPARK-25828][K8S][BUILD] Bumping Kubernetes-Clie...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22820#discussion_r228005475 --- Diff: resource-managers/kubernetes/core/pom.xml --- @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes -3.0.0 +4.0.0 --- End diff -- Yeah just wondering I don't know. What matters is what each version supports. Still no version supports 1.11 https://github.com/fabric8io/kubernetes-client/issues/1235 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22820: [SPARK-25828][K8S] Bumping Kubernetes-Client vers...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22820#discussion_r228003809 --- Diff: resource-managers/kubernetes/core/pom.xml --- @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes -3.0.0 +4.0.0 --- End diff -- 4.1.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22805: [WIP][SPARK-25809][K8S][TEST] New K8S integration testin...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22805 @rvesse I use no vm: `sudo -E ./minikube start --vm-driver=none` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22805: [WIP][SPARK-25809][K8S][TEST] New K8S integration testin...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22805 @rvesse also the following memory limit works for me: `minikube config set memory 4096` so maybe g its too much. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22805: [WIP][SPARK-25809][K8S][TEST] New K8S integration testin...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22805 @rvesse this capability was in the old integration tests repo [here](https://github.com/apache-spark-on-k8s/spark-integration/blob/master/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/CloudTestBackend.scala). In our[ fork](https://github.com/lightbend/spark/tree/spark-k8s-2.4-snapshot/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend) we have merged that part back, can we add that too? I find it really useful to test real platforms like Openshift etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r226643416 --- Diff: docs/running-on-kubernetes.md --- @@ -799,4 +815,168 @@ specific to Spark on Kubernetes. This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. + + spark.kubernetes.driver.podTemplateFile + (none) + + Specify the local file that contains the driver [pod template](#pod-template). For example + spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml` + + + + spark.kubernetes.executor.podTemplateFile + (none) + + Specify the local file that contains the executor [pod template](#pod-template). For example + spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml` + + + + + Pod template properties + +See the below table for the full list of pod specifications that will be overwritten by spark. + +### Pod Metadata + + +Pod metadata keyModified valueDescription + + name + Value of spark.kubernetes.driver.pod.name + +The driver pod name will be overwritten with either the configured or default value of +spark.kubernetes.driver.pod.name. The executor pod names will be unaffected. + + + + namespace + Value of spark.kubernetes.namespace + +Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will +be replaced by either the configured or default spark conf value. + + + + labels + Adds the labels from spark.kubernetes.{driver,executor}.label.* + +Spark will add additional labels specified by the spark configuration. + + + + annotations + Adds the annotations from spark.kubernetes.{driver,executor}.annotation.* + +Spark will add additional labels specified by the spark configuration. + + + + +### Pod Spec + + +Pod spec keyModified valueDescription + + imagePullSecrets + Adds image pull secrets from spark.kubernetes.container.image.pullSecrets + +Additional pull secrets will be added from the spark configuration to both executor pods. + + + + nodeSelector + Adds node selectors from spark.kubernetes.node.selector.* + +Additional node selectors will be added from the spark configuration to both executor pods. + + + + restartPolicy + "never" + +Spark assumes that both drivers and executors never restart. + + + + serviceAccount + Value of spark.kubernetes.authenticate.driver.serviceAccountName + +Spark will override serviceAccount with the value of the spark configuration for only +driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. + + + + serviceAccountName + Value of spark.kubernetes.authenticate.driver.serviceAccountName + +Spark will override serviceAccountName with the value of the spark configuration for only +driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. + + + + volumes + Adds volumes from spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path + +Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing +spark conf and pod template files. + + + + +### Container spec + +The following affect the driver and executor containers. All other containers in the pod spec will be unaffected. + + +Container spec keyModified valueDescription + + env + Adds env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName] + +Spark will add driver env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName], and +executor env variables from spark.executorEnv.[EnvironmentVariableName]. + + + + image + Value of spark.kubernetes.{driver,executor}.container.image + +The image will be defined by the spark configurations. + + + + imagePullPolicy + Value of spark.kubernetes.container.image.pullPolicy + +Spark will override the pull policy for both driver and executors. + + + + name + See description. + +The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and +"executor" for each executor container) if not defined by the pod template. If the container is de
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r226642833 --- Diff: docs/running-on-kubernetes.md --- @@ -799,4 +815,168 @@ specific to Spark on Kubernetes. This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. + + spark.kubernetes.driver.podTemplateFile + (none) + + Specify the local file that contains the driver [pod template](#pod-template). For example + spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml` + + + + spark.kubernetes.executor.podTemplateFile + (none) + + Specify the local file that contains the executor [pod template](#pod-template). For example + spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml` + + + + + Pod template properties + +See the below table for the full list of pod specifications that will be overwritten by spark. + +### Pod Metadata + + +Pod metadata keyModified valueDescription + + name + Value of spark.kubernetes.driver.pod.name + +The driver pod name will be overwritten with either the configured or default value of +spark.kubernetes.driver.pod.name. The executor pod names will be unaffected. + + + + namespace + Value of spark.kubernetes.namespace + +Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will +be replaced by either the configured or default spark conf value. + + + + labels + Adds the labels from spark.kubernetes.{driver,executor}.label.* + +Spark will add additional labels specified by the spark configuration. + + + + annotations + Adds the annotations from spark.kubernetes.{driver,executor}.annotation.* + +Spark will add additional labels specified by the spark configuration. + + + + +### Pod Spec + + +Pod spec keyModified valueDescription + + imagePullSecrets + Adds image pull secrets from spark.kubernetes.container.image.pullSecrets + +Additional pull secrets will be added from the spark configuration to both executor pods. + + + + nodeSelector + Adds node selectors from spark.kubernetes.node.selector.* + +Additional node selectors will be added from the spark configuration to both executor pods. + + + + restartPolicy + "never" + +Spark assumes that both drivers and executors never restart. + + + + serviceAccount + Value of spark.kubernetes.authenticate.driver.serviceAccountName + +Spark will override serviceAccount with the value of the spark configuration for only +driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. + + + + serviceAccountName + Value of spark.kubernetes.authenticate.driver.serviceAccountName + +Spark will override serviceAccountName with the value of the spark configuration for only +driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. + + + + volumes + Adds volumes from spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path + +Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing --- End diff -- +1. As I mentioned before we need to know the implications of whatever property we expose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 Thanx @vanzin! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @vanzin didn't mean to press or anything :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @vanzin ready to merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @vanzin thanx for the review I think It looks ok now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225339134 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +391,38 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +source <- appStatusSource +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { + val localSubmissionTime = +LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) + val localCompletionTime = +LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) + val duration = Duration.between(localSubmissionTime, localCompletionTime) --- End diff -- Yeah just did the Java 8 thing, guess too much, will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225339052 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +391,38 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +source <- appStatusSource +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { + val localSubmissionTime = +LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) + val localCompletionTime = +LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) + val duration = Duration.between(localSubmissionTime, localCompletionTime) + source.JOB_DURATION.value.set(duration.toMillis) + } + + // update global app status counters + appStatusSource.foreach(_.COMPLETED_STAGES.inc(job.completedStages.size)) --- End diff -- ok will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r225339164 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import AppStatusSource.getCounter +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override implicit val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = getCounter("stages", "failedStages") + + val SKIPPED_STAGES = getCounter("stages", "skippedStages") + + val COMPLETED_STAGES = getCounter("stages", "completedStages") + + val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") + + val FAILED_JOBS = getCounter("jobs", "failedJobs") + + val COMPLETED_TASKS = getCounter("tasks", "completedTasks") + + val FAILED_TASKS = getCounter("tasks", "failedTasks") + + val KILLED_TASKS = getCounter("tasks", "killedTasks") + + val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") + + val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") + + val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") +} + +private[spark] object AppStatusSource { + + def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { +metricRegistry.counter (MetricRegistry.name (prefix, name) ) + } + + def createSource(conf: SparkConf): Option[AppStatusSource] = { +Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED)) + .filter(identity) + .map(_ => new AppStatusSource()) --- End diff -- ok :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @vanzin could I get a merge pls? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22639: [SPARK-25647][k8s] Add spark streaming compatibil...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22639#discussion_r224451681 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StreamingCompatibilitySuite.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest + +import java.net._ + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Span} + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ +import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util + +private[spark] trait StreamingCompatibilitySuite { + + k8sSuite: KubernetesSuite => + + import StreamingCompatibilitySuite._ + + test("Run spark streaming in client mode.", k8sTestTag) { +val (host, port, serverSocket) = startSocketServer() --- End diff -- We could use a custom source as an alternative for feeding the source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r224071384 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) --- End diff -- ok makes sense --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r224071170 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,11 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc(1)) --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 Thanx a lot @aditanase ! I will update the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r224070656 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry +.register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) + + val SKIPPED_STAGES = metricRegistry.counter(MetricRegistry.name("skippedStages")) + + val COMPLETED_STAGES = metricRegistry.counter(MetricRegistry.name("completedStages")) + + val SUCCEEDED_JOBS = metricRegistry.counter(MetricRegistry.name("succeededJobs")) + + val FAILED_JOBS = metricRegistry.counter(MetricRegistry.name("failedJobs")) + + val COMPLETED_TASKS = metricRegistry.counter(MetricRegistry.name("completedTasks")) + + val FAILED_TASKS = metricRegistry.counter(MetricRegistry.name("failedTasks")) + + val KILLED_TASKS = metricRegistry.counter(MetricRegistry.name("killedTasks")) + + val SKIPPED_TASKS = metricRegistry.counter(MetricRegistry.name("skippedTasks")) + + val BLACKLISTED_EXECUTORS = metricRegistry.counter(MetricRegistry.name("blackListedExecutors")) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @vanzin AFAIK the operations just update metrics in the underlying dropwizard metrics library. I dont htink anything is shipped anywhere. How should I proceed? Is there anyone who is familiar with the internals? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @srowen integration tests seem so flaky! Seen this before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22392: [SPARK-23200] Reset Kubernetes-specific config on Checkp...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22392 @liyinan926 @ssaavedra one thing I just noticed is that if you restart the driver and initially you didnt set spark.executorEnv. then if you set it when you restart the job, it has no effect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 jenkins, test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 test failure is irrelevant. jenkins, test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22392: [SPARK-23200] Reset Kubernetes-specific config on Checkp...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22392 @ssaavedra just check this with streaming it works fine. I used the following code jus tin case you want to repeat the test: https://gist.github.com/skonto/12217e93e7e9272365eb978025d42a4c --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @erikerlandson I added an option to disable them, by default they are now disabled. Is it possible to move forward with this? @srowen ? Btw I removed any code triggered by this unless explicitly enabled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22392: [SPARK-23200] Reset Kubernetes-specific config on...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22392#discussion_r218056207 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -54,6 +54,10 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.driver.bindAddress", "spark.driver.port", "spark.master", + "spark.kubernetes.driver.pod.name", + "spark.kubernetes.driver.limit.cores", + "spark.kubernetes.executor.limit.cores", --- End diff -- I agree with getting it to work first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22392: [SPARK-23200] Reset Kubernetes-specific config on...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22392#discussion_r218002553 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala --- @@ -54,6 +54,10 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.driver.bindAddress", "spark.driver.port", "spark.master", + "spark.kubernetes.driver.pod.name", + "spark.kubernetes.driver.limit.cores", + "spark.kubernetes.executor.limit.cores", --- End diff -- Are there any other properties that need to be restored beyond that? How about `spark.kubernetes.allocation.batch.size` , `spark.kubernetes.executor.lostCheck.maxAttempts` `spark.kubernetes.report.interval` `spark.kubernetes.executor.eventProcessingInterval`? Also there are several other properties for Python and R, listed in the [config](https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r217672239 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala --- @@ -503,9 +503,12 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): --- End diff -- Ok will do thanx. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22405 Thanks @liyinan926! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @erikerlandson I dont any tests for sources AFAIK, they exist for the metric system itself. From the commit history this look like an old feature. Regarding enabling or disabling them no at the moment. But it is fairly easy to do via spark conf as there is one flag for structured streaming already `spark.sql.streaming.metricsEnabled`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217215209 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala --- @@ -167,13 +167,23 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { executorSpecificConf.executorId, TEST_SPARK_APP_ID, Some(driverPod)) - k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && + + // Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX + // has not be set for the tests and thus KubernetesConf will use a random + // string for the prefix, based on the app name, and this comparison here will fail. + val k8sConfCopy = k8sConf +.copy(appResourceNamePrefix = "") + .copy(sparkConf = conf) --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217214888 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -254,3 +251,18 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { +val launchTime = System.currentTimeMillis() +s"$appName-$launchTime" + .toLowerCase + .replaceAll("\\s+", "-") + .replaceAll("\\.", "-") + .replaceAll("[^a-z0-9\\-]", "") + .replaceAll("-+", "-") + } --- End diff -- reduce dashes. for example: ``` scala> "Spark #$ d #.Pi" .toLowerCase .replaceAll("\\s+", "-") .replaceAll("\\.", "-") .replaceAll("[^a-z0-9\\-]", "") .replaceAll("-+", "-") res15: String = spark-d-pi ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217214065 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -254,3 +251,18 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { +val launchTime = System.currentTimeMillis() +s"$appName-$launchTime" + .toLowerCase + .replaceAll("\\s+", "-") + .toLowerCase.replaceAll("\\.", "-") + .replaceAll("[^a-z0-9\\-]", "") + .replaceAll("-+", "-") --- End diff -- reduce dashes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217213999 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -254,3 +251,18 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { +val launchTime = System.currentTimeMillis() +s"$appName-$launchTime" + .toLowerCase + .replaceAll("\\s+", "-") + .toLowerCase.replaceAll("\\.", "-") --- End diff -- keep replacement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22405 jenkins, test this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22405 tests fixed... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22405 @liyinan926 the prefix is defined as: ``` val kubernetesResourceNamePrefix = { s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") }``` So it looks better to replace `.` with `-` will do that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217189938 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -254,3 +251,17 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { +val launchTime = System.currentTimeMillis() +s"$appName-$launchTime" + .toLowerCase + .replaceAll(" +", " ") + .replaceAll("\\s", "-") + .replaceAll("[^A-Za-z0-9\\-]", "") --- End diff -- Correct will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217189066 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -254,3 +251,17 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { +val launchTime = System.currentTimeMillis() +s"$appName-$launchTime" + .toLowerCase + .replaceAll(" +", " ") + .replaceAll("\\s", "-") --- End diff -- yeah sure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 ok will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @srowen I was checking git history and I see that you merged this one: https://github.com/apache/spark/pull/22218 again related to metrics. Could you call someone who can do the merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22405#discussion_r217170079 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -254,3 +251,17 @@ private[spark] class KubernetesClientApplication extends SparkApplication { } } } + +private[spark] object KubernetesClientApplication { + + def getAppName(conf: SparkConf): String = conf.getOption("spark.app.name").getOrElse("spark") + + def getResourceNamePrefix(appName: String): String = { +val launchTime = System.currentTimeMillis() +s"$appName-$launchTime" + .toLowerCase + .replaceAll(" +", " ") + .replaceAll("\\s", "-") + .replaceAll("[^A-Za-z0-9\\-]", "") --- End diff -- Might be a bit strict but if people want weird names then they should know k8s does not accept it and we use the appname for their convenience when they list pods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22405: [SPARK-25295][K8S]Fix executor names collision
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22405 @liyinan926 @mccheah pls review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22405: [SPARK-25295][K8S]Fix executor names collision
GitHub user skonto opened a pull request: https://github.com/apache/spark/pull/22405 [SPARK-25295][K8S]Fix executor names collision ## What changes were proposed in this pull request? Fixes the collision issue with spak executor names in client mode. Also fixes the issue with spark app name having spaces in cluster mode. If you run the Spark Pi test in client mode it passes. The tricky part is the user may set the app name: https://github.com/apache/spark/blob/3030b82c89d3e45a2e361c469fbc667a1e43b854/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala#L30 If i do: ``` ./bin/spark-submit --master k8s://http://127.0.0.1:8001 --deploy-mode cluster --name "spark pi ... ``` it will fail as the app name is used for the prefix of driver's pod name and it cannot have spaces (According to k8s conventions). ## How was this patch tested? Manually by a running spark job in client mode. To reproduce do: ``` kubectl create -f service.yaml kubectl create -f pod.yaml ``` service.yaml : ``` kind: Service apiVersion: v1 metadata: name: spark-test-app-1-svc spec: clusterIP: None selector: spark-app-selector: spark-test-app-1 ports: - protocol: TCP name: driver-port port: 7077 targetPort: 7077 - protocol: TCP name: block-manager port: 1 targetPort: 1 ``` pod.yaml: ``` apiVersion: v1 kind: Pod metadata: name: spark-test-app-1 labels: spark-app-selector: spark-test-app-1 spec: containers: - name: spark-test image: skonto/spark:k8s-client-fix imagePullPolicy: Always command: - 'sh' - '-c' - "/opt/spark/bin/spark-submit --verbose --master k8s://https://kubernetes.default.svc --deploy-mode client --class org.apache.spark.examples.SparkPi --conf spark.app.name=spark --conf spark.executor.instances=1 --conf spark.kubernetes.container.image=skonto/spark:k8s-client-fix --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt --conf spark.executor.memory=500m --conf spark.executor.cores=1 --conf spark.executor.instances=1 --conf spark.driver.host=spark-test-app-1-svc.default.svc --conf spark.driver.port=7077 --conf spark.driver.blockManager.port=1 local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar 100" ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/skonto/spark fix-k8s-client-mode-executor-names Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22405 commit d6b4a9f4af8e7dd1552df4a9d6df51fe5b41f08b Author: Stavros Kontopoulos Date: 2018-09-12T19:47:57Z fix executor names collision --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22146 @onursatici I tried the PR and it works so far. We can keep the convention let's hope users will be careful.Would it possible to add some tests like sidecar tests to examine containers setup etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @srowen thanks is there someone I could call? @vanzin ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22381: [SPARK-25394][CORE] Add an application status metrics so...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22381 @srowen @xuanyuanking ready for another round. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216496620 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- @srowen regarding multiple options its a valid technique: https://alvinalexander.com/scala/how-to-use-multiple-options-for-loop-comprehension Anyway I can keep it simple and just use ifDefined and then get the values, it liked the former though, more idiomatic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216476831 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { +val localSubmissionTime = + LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) +val localCompletionTime = + LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) +val duration = Duration.between(localCompletionTime, localSubmissionTime) +appStatusSource.foreach{_.JOB_DURATION.update(duration.toMillis)} --- End diff -- Actually I will transform it to gauge makes more sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216472252 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { --- End diff -- It should be possible just tried the elegant java way: https://stackoverflow.com/questions/4927856/how-to-calculate-time-difference-in-java :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216470277 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ + + override val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val JOB_DURATION = metricRegistry.histogram(MetricRegistry.name("jobDuration")) + + val FAILED_STAGES = metricRegistry.counter(MetricRegistry.name("failedStages")) + + val SKIPPED_STAGES = metricRegistry.counter(MetricRegistry.name("skippedStages")) + + val COMPLETED_STAGES = metricRegistry.counter(MetricRegistry.name("completedStages")) + + val COMPLETED_JOBS = metricRegistry.counter(MetricRegistry.name("completedJobs")) --- End diff -- this is not used, need to remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216469838 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { +val localSubmissionTime = + LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) +val localCompletionTime = + LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) +val duration = Duration.between(localCompletionTime, localSubmissionTime) +appStatusSource.foreach{_.JOB_DURATION.update(duration.toMillis)} --- End diff -- Ideally this should be a gauge due to this: https://prometheus.io/docs/instrumenting/writing_exporters/#drop-less-useful-statistics --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216465915 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -382,11 +392,37 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { -case JobSucceeded => JobExecutionStatus.SUCCEEDED -case JobFailed(_) => JobExecutionStatus.FAILED +case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc(1)} + JobExecutionStatus.SUCCEEDED +case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc(1)} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { +submissionTime <- job.submissionTime +completionTime <- job.completionTime + } yield { +val localSubmissionTime = + LocalDateTime.ofInstant(submissionTime.toInstant, ZoneId.systemDefault) +val localCompletionTime = + LocalDateTime.ofInstant(completionTime.toInstant, ZoneId.systemDefault) +val duration = Duration.between(localCompletionTime, localSubmissionTime) --- End diff -- need to reverse this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216431046 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -560,6 +561,7 @@ class SparkContext(config: SparkConf) extends Logging { setupAndStartListenerBus() postEnvironmentUpdate() +_env.metricsSystem.registerSource(appStatusSource) --- End diff -- Yeah I was wondering where that would be bet to put. So at the end makes sense on the other hand what if I want to have metrics as early as the app starts. Anyway, for now its ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216430860 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -280,6 +284,12 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { +appStatusSource.foreach{_.BLACKLISTED_EXECUTORS.inc(1)} + } + else { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22381: [SPARK-25394][CORE] Add an application status met...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22381#discussion_r216430819 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusSource.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.source.Source + +private[spark] class AppStatusSource extends Source{ --- End diff -- :+1: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org