[GitHub] spark issue #19870: [SPARK-22665][SQL] Avoid repartitioning with empty list ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19870 **[Test build #84441 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84441/testReport)** for PR 19870 at commit [`41ad7b9`](https://github.com/apache/spark/commit/41ad7b9cd1976d84caded2cc4fc7f50d0585de03). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154807233 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the --- End diff -- I don't think it's absolutely necessary to have a parameter for the namespace. `--kubernetes-namespace` is not significantly shorter than `spark.kubernetes.namespace`. @mccheah @foxish WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #84446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84446/testReport)** for PR 19872 at commit [`c1dc543`](https://github.com/apache/spark/commit/c1dc543f9ff6e77b310d1a38c3f3c2c4e8eeaf63). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class UDFColumn(Column):` * `case class AggregateInPandas(` * `case class AggregateInPandasExec(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19872 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84446/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19872 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19872 **[Test build #84446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84446/testReport)** for PR 19872 at commit [`c1dc543`](https://github.com/apache/spark/commit/c1dc543f9ff6e77b310d1a38c3f3c2c4e8eeaf63). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154806055 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAcco
[GitHub] spark pull request #19878: [SPARK-22682][SQL] HashExpression does not need t...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19878#discussion_r154805001 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala --- @@ -270,17 +270,36 @@ abstract class HashExpression[E] extends Expression { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { ev.isNull = "false" -val childrenHash = ctx.splitExpressions(children.map { child => + +val childrenHash = children.map { child => val childGen = child.genCode(ctx) childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { computeHash(childGen.value, child.dataType, ev.value, ctx) } -}) +} + +val hashResultType = ctx.javaType(dataType) +val codes = if (ctx.INPUT_ROW == null || ctx.currentVars != null) { --- End diff -- That one has been merged, but this one is still different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19880: [SPARK-22626][SQL][FOLLOWUP] improve documentatio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19880 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19880: [SPARK-22626][SQL][FOLLOWUP] improve documentation and s...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19880 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154802853 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.steps._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.SystemClock + +/** + * Constructs the complete list of driver configuration steps to run to deploy the Spark driver. + */ +private[spark] class DriverConfigurationStepsOrchestrator( +namespace: String, +kubernetesAppId: String, +launchTime: Long, +mainAppResource: Option[MainAppResource], +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) { + + // The resource name prefix is derived from the application name, making it easy to connect the + // names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the + // application the user submitted. However, we can't use the application name in the label, as + // label values are considerably restrictive, e.g. must be no longer than 63 characters in + // length. So we generate a separate identifier for the app ID itself, and bookkeeping that + // requires finding "all pods for this application" should use the kubernetesAppId. + private val kubernetesResourceNamePrefix = --- End diff -- Yes, it needs to be unique. Changed to use `UUID.nameUUIDFromBytes` based on `launchTime`. @mccheah any concern about this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154802593 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAcco
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19871 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19871 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84439/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154801257 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( --- End diff -- /cc @mccheah. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84439 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84439/testReport)** for PR 19871 at commit [`8bc420a`](https://github.com/apache/spark/commit/8bc420ab6a085360f3996759819ad44dd40f9703). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800789 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAcco
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800800 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAcco
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800815 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File + +import org.apache.spark.util.Utils + +private[spark] object KubernetesFileUtils { + + /** + * For the given collection of file URIs, resolves them as follows: + * - File URIs with scheme file:// are resolved to the given download path. + * - File URIs with scheme local:// resolve to just the path of the URI. + * - Otherwise, the URIs are returned as-is. + */ + def resolveSubmittedUris( + fileUris: Iterable[String], + fileDownloadPath: String): Iterable[String] = { +fileUris.map { uri => + val fileUri = Utils.resolveURI(uri) + val fileScheme = Option(fileUri.getScheme).getOrElse("file") + fileScheme match { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17671: [SPARK-20368][PYSPARK] Provide optional support for Sent...
Github user tedmiston commented on the issue: https://github.com/apache/spark/pull/17671 Anyone know what the status of this PR is? I'd really like to be able to run Sentry on my PySpark code in production. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800754 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf --- End diff -- Merged both into `spark.driver.memoryOverhead` and used it in both yarn and k8s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800774 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() +.withName(ENV_SUBMIT_EXTRA_CLASSPATH) +.withValue(classPath) +.build() +} + +val driverCustomAnnotations = ConfigurationUtils + .parsePrefixedKeyValuePairs( +submissionSparkConf, +KUBERNETES_DRIVER_ANNOTATION_PREFIX) +require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + +" Spark bookkeeping operations.") + +val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800102 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") --- End diff -- Agreed. But it seems `spark.driver.cores` is used in a lot of places. I think it needs a separate PR to union all of them. It also worths pointing out that the value of `spark.driver.cores` is used to set CPU request, and in Kubernetes this can be fractional, e.g., `0.1` or `100m`. `DRIVER_CORES` in yarn, however, only accepts integer values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154787608 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { iterator.foreach(_.close()) } + +// Clean corrupt or empty files that may have accumulated. +if (AGGRESSIVE_CLEANUP) { + var untracked: Option[KVStoreIterator[LogInfo]] = None + try { +untracked = Some(listing.view(classOf[LogInfo]) --- End diff -- This logic seems to be similar to what I have in the pipeline for the new SHS project at https://github.com/vanzin/spark/pull/40. Except my change takes care of other things (like also cleaning up any loaded UI data). Could you take a look at that PR and see whether there's something it's not covering? I can incorporate any needed changes there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154784102 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newApp) } -toDelete.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - try { -listing.delete(classOf[LogInfo], logPath.toString()) - } catch { -case _: NoSuchElementException => - logDebug(s"Log info entry for $logPath not found.") - } - try { -fs.delete(logPath, true) - } catch { -case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") -case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - } -} +toDelete + .map(attempt => new Path(logDir, attempt.logPath)) --- End diff -- `.map { attempt =>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154784139 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newApp) } -toDelete.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - try { -listing.delete(classOf[LogInfo], logPath.toString()) - } catch { -case _: NoSuchElementException => - logDebug(s"Log info entry for $logPath not found.") - } - try { -fs.delete(logPath, true) - } catch { -case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") -case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - } -} +toDelete + .map(attempt => new Path(logDir, attempt.logPath)) + .foreach(logPath => deleteLogInfo(logPath)) --- End diff -- `.foreach { logPath =>` (or `.foreach(deleteLogInfo)`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC disassoci...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19741 **[Test build #84445 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84445/testReport)** for PR 19741 at commit [`930ba79`](https://github.com/apache/spark/commit/930ba795fa19a8158174de153351738a64fbcb2c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC disassoci...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19741 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19882: [SPARK-22672][SQL][TEST] Refactor ORC Tests
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19882 Hi, @cloud-fan , @gatorsmile , @HyukjinKwon , @viirya . This is a test case restructure after https://github.com/apache/spark/pull/19651 . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19224: [SPARK-20990][SQL] Read all JSON documents in files when...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19224 @maracujah it is waiting for review --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19883: [SPARK-22684][SQL] datetime functions should not generat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19883 **[Test build #8 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/8/testReport)** for PR 19883 at commit [`78850c6`](https://github.com/apache/spark/commit/78850c6724f854ed317f22ee9845117fa9acae46). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154783790 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19520: [SPARK-22298][WEB-UI] url encode APP id before generatin...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19520 We should close this. I don't see any user benefit in supporting slashes in app ids. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19717: [SPARK-22646] [Submission] Spark on Kubernetes - basic s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19717 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19717: [SPARK-22646] [Submission] Spark on Kubernetes - basic s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19717 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84438/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154783519 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19717: [SPARK-22646] [Submission] Spark on Kubernetes - basic s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19717 **[Test build #84438 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84438/testReport)** for PR 19717 at commit [`cfcf2a7`](https://github.com/apache/spark/commit/cfcf2a7622f2bd4a21ea2d5197ec652363645aca). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154782915 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources +*/ + def run(): Unit = {
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154782933 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( +appId: String, +maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154782904 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -55,14 +63,35 @@ private[spark] object Config extends Logging { val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" val KUBERNETES_SERVICE_ACCOUNT_NAME = -ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses " + "this service account when requesting executor pods from the API server. If specific " + "credentials are given for the driver pod to use, the driver will favor " + "using those credentials instead.") .stringConf .createOptional + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19882: [SPARK-22672][SQL][TEST] Refactor ORC Tests
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19882 **[Test build #84443 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84443/testReport)** for PR 19882 at commit [`5f2025a`](https://github.com/apache/spark/commit/5f2025ada40d146411b5e14ddf37f3fccc4cae97). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19883: [SPARK-22684][SQL] datetime functions should not ...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/19883 [SPARK-22684][SQL] datetime functions should not generate useless mutable states ## What changes were proposed in this pull request? Some datetime functions are defining mutable states which are not needed at all. This is bad for the well known issues related to constant pool limits. ## How was this patch tested? added UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-22684 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19883.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19883 commit 78850c6724f854ed317f22ee9845117fa9acae46 Author: Marco Gaido Date: 2017-12-04T21:34:00Z [SPARK-22684][SQL] Avoid the generation of useless mutable states by datetime functions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r154782452 --- Diff: python/pyspark/sql/group.py --- @@ -89,8 +89,15 @@ def agg(self, *exprs): else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" -jdf = self._jgd.agg(exprs[0]._jc, -_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]])) +if isinstance(exprs[0], UDFColumn): +assert all(isinstance(c, UDFColumn) for c in exprs) --- End diff -- So I'm a little worried about this change, if other folks have wrapped Java UDAFs (which is reasonable since there aren't other ways to make UDAFs in PySpark before this), this seems like they won't be able to mix them. I'd suggest maybe doing what @viirya suggested bellow but instead of a failure just a warning until Spark 3. What do y'all think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19882: [SPARK-22672][SQL][TEST] Refactor ORC Tests
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/19882 [SPARK-22672][SQL][TEST] Refactor ORC Tests ## What changes were proposed in this pull request? Since SPARK-20682, we have two `OrcFileFormat`s. This PR refactors ORC tests with the following two principles. - Move test suite into `sql/core`. - Create `HiveXXX` test suite in `sql/hive` by reusing `sql/core` test suite. OrcFileFormat in `sql/core` | OrcFileFormat in `sql/hive` -- | -- OrcFilterSuite | HiveOrcFilterSuite OrcHadoopFsRelationSuite | HiveOrcHadoopFsRelationSuite OrcPartitionDiscoverySuite | HiveOrcPartitionDiscoverySuite OrcQuerySuite | HiveOrcQuerySuite OrcSourceSuite | HiveOrcSourceSuite Note that `OrcHadoopFsRelationSuite` is inside `sql/hive` like `ParquetHadoopFsRelationSuite`. ## How was this patch tested? Pass the Jenkins tests with reorganized test suites. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-22672 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19882 commit 5f2025ada40d146411b5e14ddf37f3fccc4cae97 Author: Dongjoon Hyun Date: 2017-08-18T19:13:15Z [SPARK-22672][SQL][TEST] Refactor ORC Tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19224: [SPARK-20990][SQL] Read all JSON documents in files when...
Github user maracujah commented on the issue: https://github.com/apache/spark/pull/19224 What is the status of this item? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19591: [SPARK-11035][core] Add in-process Spark app launcher.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19591 **[Test build #84442 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84442/testReport)** for PR 19591 at commit [`8496024`](https://github.com/apache/spark/commit/84960244458045810f0066256fbbee2446a3071c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19591: [SPARK-11035][core] Add in-process Spark app launcher.
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19591 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759879 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() +.withName(ENV_SUBMIT_EXTRA_CLASSPATH) +.withValue(classPath) +.build() +} + +val driverCustomAnnotations = ConfigurationUtils + .parsePrefixedKeyValuePairs( +submissionSparkConf, +KUBERNETES_DRIVER_ANNOTATION_PREFIX) +require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + +" Spark bookkeeping operations.") + +val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => --- End diff -- `.map { env =>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154749355 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the --- End diff -- Is it a big deal to not add this as a command line arg and force people to use the configuration instead? I'd prefer to not add even more cluster-specific switches to `SparkSubmit`, at least not until it is refactored to be pluggable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759045 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) --- End diff -- Why use the full path all over the place instead of importing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154757417 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( +appId: String, +maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") --- End diff -- You could use `.checkValue` in the constant declaration instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154755221 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.steps._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.SystemClock + +/** + * Constructs the complete list of driver configuration steps to run to deploy the Spark driver. + */ +private[spark] class DriverConfigurationStepsOrchestrator( +namespace: String, +kubernetesAppId: String, +launchTime: Long, +mainAppResource: Option[MainAppResource], +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) { + + // The resource name prefix is derived from the application name, making it easy to connect the + // names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the + // application the user submitted. However, we can't use the application name in the label, as + // label values are considerably restrictive, e.g. must be no longer than 63 characters in + // length. So we generate a separate identifier for the app ID itself, and bookkeeping that + // requires finding "all pods for this application" should use the kubernetesAppId. + private val kubernetesResourceNamePrefix = --- End diff -- Is this required to be unique? If so, this could use some more uniqueness (e.g. using a UUID instead of the "launch time"). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154771100 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAccount.
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154770325 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAccount.
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154754517 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources +*/ + def run(): Unit = {
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154770165 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAccount.
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154750457 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -55,14 +63,35 @@ private[spark] object Config extends Logging { val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" val KUBERNETES_SERVICE_ACCOUNT_NAME = -ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses " + "this service account when requesting executor pods from the API server. If specific " + "credentials are given for the driver pod to use, the driver will favor " + "using those credentials instead.") .stringConf .createOptional + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") --- End diff -- s/a single/each --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154771178 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( + driverServiceAccount.
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759371 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") --- End diff -- You could move the `DRIVER_CORES` config from the YARN module to core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759574 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf --- End diff -- Another config that could use some re-factoring so that YARN and k8s use the same logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759746 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { --- End diff -- Fits in the previous line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154750025 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -388,6 +388,32 @@ class SparkSubmitSuite conf.get("spark.ui.enabled") should be ("false") } + test("handles k8s cluster mode") { +val clArgs = Seq( + "--deploy-mode", "cluster", + "--master", "k8s://host:port", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--kubernetes-namespace", "foo", + "--driver-memory", "4g", + "--conf", "spark.kubernetes.driver.docker.image=bar", + "/home/thejar.jar", + "arg1") +val appArgs = new SparkSubmitArguments(clArgs) +val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) + +val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap +childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar")) +childArgsMap.get("--main-class") should be (Some("org.SomeClass")) +childArgsMap.get("--arg") should be (Some("arg1")) +mainClass should be ("org.apache.spark.deploy.k8s.submit.Client") +classpath should have length (0) +conf.get("spark.executor.memory") should be ("5g") --- End diff -- Check `spark.master` too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154751082 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") --- End diff -- The doc string says "download jars to". Is it guaranteed that this directory will be writable? Generally only root can write to things in "/var" by default, and I assume you're not running things as root even if it's inside a containers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154756363 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File + +import org.apache.spark.util.Utils + +private[spark] object KubernetesFileUtils { + + /** + * For the given collection of file URIs, resolves them as follows: + * - File URIs with scheme file:// are resolved to the given download path. + * - File URIs with scheme local:// resolve to just the path of the URI. + * - Otherwise, the URIs are returned as-is. + */ + def resolveSubmittedUris( + fileUris: Iterable[String], + fileDownloadPath: String): Iterable[String] = { +fileUris.map { uri => + val fileUri = Utils.resolveURI(uri) + val fileScheme = Option(fileUri.getScheme).getOrElse("file") + fileScheme match { --- End diff -- This looks a lot like `resolveFilePaths`, you could probably merge the two in some way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154771774 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( --- End diff -- Not really familiar with how these things are used by k8s, but don't these certs generally have passwords? I can't seem to find anything related to passwords for these things. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19870: [SPARK-22665][SQL] Avoid repartitioning with empty list ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19870 LGTM pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19870: [SPARK-22665][SQL] Avoid repartitioning with empty list ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19870 **[Test build #84441 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84441/testReport)** for PR 19870 at commit [`41ad7b9`](https://github.com/apache/spark/commit/41ad7b9cd1976d84caded2cc4fc7f50d0585de03). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19870: [SPARK-22665][SQL] Avoid repartitioning with empty list ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19870 thanks @gatorsmile, I agree and I updated the PR accordingly. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19870: [SPARK-22665][SQL] Avoid repartitioning with empt...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19870#discussion_r154764850 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -838,6 +838,8 @@ case class RepartitionByExpression( numPartitions: Int) extends RepartitionOperation { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") + require(partitionExpressions.nonEmpty, s"${getClass.getSimpleName} requires a non empty set of " + --- End diff -- it was putting everything in the same partition --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19717: [SPARK-22646] [Submission] Spark on Kubernetes - basic s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19717 **[Test build #84440 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84440/testReport)** for PR 19717 at commit [`faa2849`](https://github.com/apache/spark/commit/faa2849b57e158471285df51a9a3b1cb0acb6b68). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19871 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84436/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19871 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84436 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84436/testReport)** for PR 19871 at commit [`2393e1d`](https://github.com/apache/spark/commit/2393e1de729441b27bc5cdd83804071f14d77a4b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19871: [SPARK-20728][SQL] Make OrcFileFormat configurable betwe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19871 **[Test build #84439 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84439/testReport)** for PR 19871 at commit [`8bc420a`](https://github.com/apache/spark/commit/8bc420ab6a085360f3996759819ad44dd40f9703). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154756706 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -537,6 +540,7 @@ object DataSource extends Logging { val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" +val newOrc = classOf[OrcFileFormat].getCanonicalName --- End diff -- Yep. It sounds better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19869 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84435/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19869 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19869: [SPARK-22677][SQL] cleanup whole stage codegen for hash ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19869 **[Test build #84435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84435/testReport)** for PR 19869 at commit [`f0320d5`](https://github.com/apache/spark/commit/f0320d5f7b3406334a5250ecc2d9c5248142f34d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154755799 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -537,6 +540,7 @@ object DataSource extends Logging { val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" +val newOrc = classOf[OrcFileFormat].getCanonicalName --- End diff -- Please do not use the name like `newXYZ`. When the newer one was added, the name will be confusing. How about `nativeOrc`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154752978 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") +.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + + "Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + + "more stable and faster.") --- End diff -- Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154752924 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") +.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + + "Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + + "more stable and faster.") +.internal() +.booleanConf --- End diff -- Yep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154752812 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") --- End diff -- No problem to change to it. But, since the name is given by @cloud-fan before, ping @cloud-fan . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #5503: [MLLIB][WIP] SPARK-4638: Kernels feature for MLLIB
Github user dale-chang91 commented on the issue: https://github.com/apache/spark/pull/5503 What is the result of this pull request? I can't seem to find the code in the code base anywhere --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19879: [SPARK-20706][SPARK-SHELL] Spark-shell not overriding me...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19879 Yes, I think it's reasonable to pull these fixes forward to patch medium-sized problems. We don't have 2.12 support fully working yet, even, so punting the solution to 2.12 means punting to 3.0, probably, and that's still ~6 months off. Meanwhile 2.3 is imminent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154746008 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +if (isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.k8s.submit.Client" --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19872: WIP: [SPARK-22274][PySpark] User-defined aggregation fun...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/19872 > I thought @ueshin is working on this BTW. Oh, I certainly don't want to duplicate @ueshin 's work. I am under the impression that @ueshin is working on two-stage PySpark UDAF with pandas_udf, but I cannot really find the Jira for it... @ueshin can you point me to what you are working on so I don't overstep? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19879: [SPARK-20706][SPARK-SHELL] Spark-shell not overriding me...
Github user mpetruska commented on the issue: https://github.com/apache/spark/pull/19879 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19879: [SPARK-20706][SPARK-SHELL] Spark-shell not overriding me...
Github user mpetruska commented on the issue: https://github.com/apache/spark/pull/19879 Jenkins, re-test this, please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user vanzin closed the pull request at: https://github.com/apache/spark/pull/19631 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19631: [SPARK-22372][core, yarn] Make cluster submission use Sp...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19631 Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19879: [SPARK-20706][SPARK-SHELL] Spark-shell not overriding me...
Github user mpetruska commented on the issue: https://github.com/apache/spark/pull/19879 By the way, are these contributions valuable, or should the focus be 2.12? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154743731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") +.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + + "Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + + "more stable and faster.") --- End diff -- > When true, use the native version of ORC support instead of the ORC library in Hive 1.2.1, which is by default prior to Spark 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154742876 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") +.doc("When true, use new OrcFileFormat in sql/core module instead of the one in sql/hive. " + + "Since new OrcFileFormat uses Apache ORC library instead of ORC library Hive 1.2.1, it is " + + "more stable and faster.") +.internal() +.booleanConf --- End diff -- ``` .checkValues(Set("hive", "native")) .createWithDefault("native") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19871: [SPARK-20728][SQL] Make OrcFileFormat configurabl...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19871#discussion_r154742713 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -363,6 +363,14 @@ object SQLConf { .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) .createWithDefault("snappy") + val ORC_USE_NEW_VERSION = buildConf("spark.sql.orc.useNewVersion") --- End diff -- `spark.sql.orc.impl` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...
Github user CodingCat commented on the issue: https://github.com/apache/spark/pull/19810 @cloud-fan would you mind continuing the review? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19864: [SPARK-22673][SQL] InMemoryRelation should utilize exist...
Github user CodingCat commented on the issue: https://github.com/apache/spark/pull/19864 @viirya any thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19879: [SPARK-20706][SPARK-SHELL] Spark-shell not overriding me...
Github user mpetruska commented on the issue: https://github.com/apache/spark/pull/19879 This one has a "Major" priority; but otherwise yes, it should be resolved in Scala 2.12 version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19852: [SPARK-22655][PySpark] Throw exception rather tha...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19852#discussion_r154737303 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -319,7 +319,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) -null.asInstanceOf[OUT] // exit silently +throw new SparkException("Spark session has been stopped", e) --- End diff -- @cloud-fan Thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19861 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19861 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84433/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org