[GitHub] spark pull request #18477: [SPARK-21261][DOCS]SQL Regex document fix
Github user visaxin commented on a diff in the pull request: https://github.com/apache/spark/pull/18477#discussion_r146457833 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala --- @@ -268,7 +268,7 @@ case class StringSplit(str: Expression, pattern: Expression) usage = "_FUNC_(str, regexp, rep) - Replaces all substrings of `str` that match `regexp` with `rep`.", extended = """ Examples: - > SELECT _FUNC_('100-200', '(\d+)', 'num'); + > SELECT _FUNC_('100-200', '(\\d+)', 'num'); --- End diff -- I add spark-sql and scala to make it clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18477: [SPARK-21261][DOCS]SQL Regex document fix
Github user visaxin commented on the issue: https://github.com/apache/spark/pull/18477 @gatorsmile Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18527: [SPARK-21101][SQL] Catch IllegalStateException when CREA...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18527 **[Test build #83007 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83007/testReport)** for PR 18527 at commit [`47071b5`](https://github.com/apache/spark/commit/47071b5ec2d1e285221918ad075878500aa7d7bb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19557 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19557 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83005/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19557 **[Test build #83005 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83005/testReport)** for PR 19557 at commit [`442711d`](https://github.com/apache/spark/commit/442711dd3f4fb5ce6f6d622cccd0298395aca88b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19557 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19557 **[Test build #83004 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83004/testReport)** for PR 19557 at commit [`a9b74de`](https://github.com/apache/spark/commit/a9b74de3af21c823de28431671512b143273f468). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19557 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83004/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...
Github user nkronenfeld commented on the issue: https://github.com/apache/spark/pull/19529 Yeah, as predicted, that made PlanTest very easy to review, but didn't do as well with SQLTestUtils. I suspect I reordered functions and what-not when I was moving stuff around. If this is still too confusing to deal with, just let me know. Even if I can't make the end diffs of the entire PR non-trivial, I could certainly re-implement this in a way that the individual commits would each be trivial; then it'd just be a question of following along commit-by-commit, and shouldn't be too bad. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19529 **[Test build #83006 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83006/testReport)** for PR 19529 at commit [`2d927e9`](https://github.com/apache/spark/commit/2d927e94f627919ac1546b47072276b23d3e8da2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19557 **[Test build #83005 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83005/testReport)** for PR 19557 at commit [`442711d`](https://github.com/apache/spark/commit/442711dd3f4fb5ce6f6d622cccd0298395aca88b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19557 **[Test build #83004 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83004/testReport)** for PR 19557 at commit [`a9b74de`](https://github.com/apache/spark/commit/a9b74de3af21c823de28431671512b143273f468). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19560: [SPARK-22334][SQL] Check table size from filesyst...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/19560#discussion_r146449741 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -120,22 +120,41 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} +getSizeFromHdfs(table.location) } else { session.sessionState.conf.defaultSizeInBytes } val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes relation.copy(tableMeta = withStats) + +case relation: HiveTableRelation +if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.nonEmpty && + session.sessionState.conf.verifyStatsFromFileSystemWhenBroadcastJoin && + relation.tableMeta.stats.get.sizeInBytes < +session.sessionState.conf.autoBroadcastJoinThreshold => + val table = relation.tableMeta + val sizeInBytes = getSizeFromHdfs(table.location) --- End diff -- Yes, I think it's good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/19557 unfortunately `attach` part didn't work on r-devel. I think you are right, I'm going try to apply the similar approach for `attach` - i started that way before I found out method signature was not the cause for `glm`. So now I'm going backtrack to re-do `attach` too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19560: [SPARK-22334][SQL] Check table size from filesyst...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19560#discussion_r146448976 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala --- @@ -120,22 +120,41 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -try { - val hadoopConf = session.sessionState.newHadoopConf() - val tablePath = new Path(table.location) - val fs: FileSystem = tablePath.getFileSystem(hadoopConf) - fs.getContentSummary(tablePath).getLength -} catch { - case e: IOException => -logWarning("Failed to get table size from hdfs.", e) -session.sessionState.conf.defaultSizeInBytes -} +getSizeFromHdfs(table.location) } else { session.sessionState.conf.defaultSizeInBytes } val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes relation.copy(tableMeta = withStats) + +case relation: HiveTableRelation +if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.nonEmpty && + session.sessionState.conf.verifyStatsFromFileSystemWhenBroadcastJoin && + relation.tableMeta.stats.get.sizeInBytes < +session.sessionState.conf.autoBroadcastJoinThreshold => + val table = relation.tableMeta + val sizeInBytes = getSizeFromHdfs(table.location) --- End diff -- If the metadata statistics are wrong, getting the size from files every time seems a burden. Can we show some message to users and suggest them to update table statistics? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18747: [SPARK-20822][SQL] Generate code to directly get value f...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18747 **[Test build #83003 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83003/testReport)** for PR 18747 at commit [`db61b41`](https://github.com/apache/spark/commit/db61b41a61da5d484742ee8f0dfa53e1486b0456). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19560: [SPARK-22334][SQL] Check table size from filesyst...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19560#discussion_r146448519 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -187,6 +187,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val VERIFY_STATS_FROM_FILESYSTEM_WHEN_BROADCASTJOIN = --- End diff -- This config name implies it only does verification when broadcast join. However, seems that it verifies the statistics no matter doing broadcast join or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17589: [SPARK-16544][SQL] Support for conversion from nu...
Github user HyukjinKwon closed the pull request at: https://github.com/apache/spark/pull/17589 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17589: [SPARK-16544][SQL] Support for conversion from numeric c...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/17589 Closing this. Will take another look and make a cleaner fix next time, or reopen if I see some more interests in this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19535: [SPARK-22313][PYTHON] Mark/print deprecation warn...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19535 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19535: [SPARK-22313][PYTHON] Mark/print deprecation warnings as...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19535 Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19535: [SPARK-22313][PYTHON] Mark/print deprecation warnings as...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19535 Thanks @srowen, @rxin and @felixcheung. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19556 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82999/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19556 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19556 **[Test build #82999 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82999/testReport)** for PR 19556 at commit [`5d7efd1`](https://github.com/apache/spark/commit/5d7efd14c0baba3e3f41258fcf6dc44f2976450a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19459 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19459 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83001/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19560: [SPARK-22334][SQL] Check table size from filesystem in c...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/19560 @gatorsmile @dongjoon-hyun Thanks a lot for looking into this. This pr aims to avoid OOM if metastore fails to update table properties after the data is already produced. With the config in this pr enabled, we check the size on filesystem only when `totalSize` is below `spark.sql.autoBroadcastJoinThreshold`, so I think the cost can be acceptable. Yes, the storage can be other filesystems. I refined the name. Please take a look again when you have time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19459 **[Test build #83001 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83001/testReport)** for PR 19459 at commit [`f421e2d`](https://github.com/apache/spark/commit/f421e2da1e97dfbc7c80b7ae724b6ea9a472b220). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10466: [SPARK-12375] [ML] add handleinvalid for vectorindexer
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/10466 @hhbyyh Do you get time to continue this PR ? thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19560: [SPARK-22334][SQL] Check table size from HDFS in case th...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19560 **[Test build #83002 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83002/testReport)** for PR 19560 at commit [`bf59c27`](https://github.com/apache/spark/commit/bf59c27d0a8a01dc0572cf148f40b6337799f241). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146439253 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146439099 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark issue #19559: [SPARK-22333][SQL]ColumnReference should get higher prio...
Github user DonnyZone commented on the issue: https://github.com/apache/spark/pull/19559 @hvanhovell Yes! I made something wrong. The `timeFunctionCall` has conflicts with `columnReference`. This fix will break every use of CURRENT_DATE/CURRENT_TIMESTAMP. For [SPARK-16836](https://github.com/apache/spark/pull/14442), I think this feature should be implemented in analysis phase rather than in parser phase. When there is no such columns, they can be transformed as functions. Another approach is to define a configuration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146437952 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146437447 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146435148 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -159,6 +159,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.getExecutorsAliveOnHost(host).foreach { exec => killExecutors(exec.toSeq, replace = true, force = true) } + + case UpdateDelegationTokens(tokens) => +// Update the driver's delegation tokens in case new executors are added later. +currentHadoopDelegationTokens = Some(tokens) +executorDataMap.values.foreach { ed => + ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) } --- End diff -- `}` goes in next line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436646 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer( private var lastCredentialsFileSuffix = 0 private val credentialRenewer = -Executors.newSingleThreadScheduledExecutor( - ThreadUtils.namedThreadFactory("Credential Refresh Thread")) +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") --- End diff -- Normally you should avoid making changes that are not related to your PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146434999 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.SparkContext --- End diff -- Change not needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436571 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +driverEndpoint: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, unset $keytab to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} +logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436883 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +driverEndpoint: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, unset $keytab to use TGT") --- End diff -- `${KEYTAB.key}`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146437027 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Try + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.util.ThreadUtils + + +/** + * The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing + * secured services using Kerberos authentication. It is modeled after the YARN AMCredential + * renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed. + * The principal difference is that instead of writing the new credentials to HDFS and + * incrementing the timestamp of the file, the new credentials (called Tokens when they are + * serialized) are broadcast to all running executors. On the executor side, when new Tokens are + * recieved they overwrite the current credentials. + */ +class MesosCredentialRenewer( +conf: SparkConf, +tokenManager: HadoopDelegationTokenManager, +nextRenewal: Long, +driverEndpoint: RpcEndpointRef) extends Logging { + private val credentialRenewerThread = +ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") + + @volatile private var timeOfNextRenewal = nextRenewal + + private val principal = conf.get(config.PRINCIPAL).orNull + + private val (secretFile, mode) = getSecretFile(conf) + + private def getSecretFile(conf: SparkConf): (String, String) = { +val keytab = conf.get(config.KEYTAB).orNull +val tgt = conf.getenv("KRB5CCNAME") +require(keytab != null || tgt != null, "A keytab or TGT required.") +// if both Keytab and TGT are detected we use the Keytab. +val (secretFile, mode) = if (keytab != null && tgt != null) { + logWarning(s"Keytab and TGT were detected, using keytab, unset $keytab to use TGT") + (keytab, "keytab") +} else { + val m = if (keytab != null) "keytab" else "tgt" + val sf = if (keytab != null) keytab else tgt + (sf, m) +} +logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop delegation tokens") +logDebug(s"secretFile is $secretFile") +(secretFile, mode) + } + + def scheduleTokenRenewal(): Unit = { +def scheduleRenewal(runnable: Runnable): Unit = { + val remainingTime = timeOfNextRenewal - System.currentTimeMillis() + if (remainingTime <= 0) { +logInfo("Credentials have expired, creating new ones now.") +runnable.run() + } else { +logInfo(s"Scheduling login from keytab in $remainingTime millis.") +credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) + } +} + +val credentialRenewerRunnable = + new Runnable { +override def run(): Unit = { + try { +val creds = getRenewedDelegationTokens(conf) +broadcastDelegationTokens(creds) + } catch { +case e: Exception => + // Log the error and try to write new tokens back in an hour +
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146436327 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -213,6 +216,24 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && currentHadoopDelegationTokens.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + // The renewal time is ignored when getting the initial delegation tokens + // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), so we get the renewal + // time here and schedule a thread to renew them. + val renewalTime = --- End diff -- I still don't like this. You should not need to implement this separate method of getting the renewal time just because the current code is throwing out that information. Instead you should fix the code so that the information is preserved. `getHadoopDelegationCreds` is called in only one place, so my suggestion would be to encapsulate initializing the token manager and getting the initial set of tokens into a single method (instead of the current two). Then in that method's implementation you can get the initial set of tokens, initialize the renewer thread with the correct renewal period, and return the data needed by the scheduler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r146436616 --- Diff: python/pyspark/sql/session.py --- @@ -414,6 +415,52 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _createFromPandasWithArrow(self, pdf, schema): +""" +Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting +to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the +data types will be used to coerce the data in Pandas to Arrow conversion. +""" +from pyspark.serializers import ArrowSerializer, _create_batch +from pyspark.sql.types import from_arrow_schema, to_arrow_type +import pyarrow as pa + +# Slice the DataFrame into batches +step = -(-len(pdf) // self.sparkContext.defaultParallelism) # round int up +pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step)) + +if schema is None or isinstance(schema, list): +batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False) + for pdf_slice in pdf_slices] + +# There will be at least 1 batch after slicing the pandas.DataFrame +schema_from_arrow = from_arrow_schema(batches[0].schema) + +# If passed schema as a list of names then rename fields +if isinstance(schema, list): +fields = [] +for i, field in enumerate(schema_from_arrow): +field.name = schema[i] +fields.append(field) +schema = StructType(fields) +else: +schema = schema_from_arrow +else: +if not isinstance(schema, StructType) and isinstance(schema, DataType): +schema = StructType().add("value", schema) --- End diff -- BTW, I think we should not support this case: ```python >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show() ``` ``` +-+ |value| +-+ |1| +-+ ``` ```python >>> spark.conf.set("spark.sql.execution.arrow.enabled", "false") >>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show() ``` ``` Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/sql/session.py", line 595, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/.../spark/python/pyspark/sql/session.py", line 399, in _createFromLocal data = list(data) File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare verify_func(obj) File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify verify_value(obj) File "/.../spark/python/pyspark/sql/types.py", line 1337, in verify_integer verify_acceptable_types(obj) File "/.../spark/python/pyspark/sql/types.py", line 1300, in verify_acceptable_types % (dataType, obj, type(obj TypeError: field value: IntegerType can not accept object (1,) in type ``` I thought disallowing it is actually more consistent with normal Python lists: ```python >>> spark.createDataFrame([1], "int").show() ``` ``` +-+ |value| +-+ |1| +-+ ``` ```python >>> spark.createDataFrame([[1]], "int").show() ``` ``` Traceback (most recent call last): File "", line 1, in File "/U.../spark/python/pyspark/sql/session.py", line 595, in createDataFrame rdd, schema = self._createFromLocal(map(prepare, data), schema) File "/.../spark/python/pyspark/sql/session.py", line 399, in _createFromLocal data = list(data) File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare verify_func(obj) File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify verify_value(obj) File "/.../spark/python/pyspark/sql/types.py", line 1337, in verify_integer verify_acceptable_types(obj) File "/.../spark/python/pyspark/sql/types.py", line 1300, in verify_acceptable_types % (dataType, obj, type(obj TypeError: field value: IntegerType can not accept object [1] in type ``` If we need to support this, I think it should print as below: ```python >>> spark.createDataFrame([[1]], "string").show() ``` ``` +-+ |value| +-+ | [1]| +-+ ```
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19527 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83000/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19527 **[Test build #83000 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83000/testReport)** for PR 19527 at commit [`adc4107`](https://github.com/apache/spark/commit/adc410770528c6c95a3c35de64548362c1b46643). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19527 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...
Github user nkronenfeld commented on the issue: https://github.com/apache/spark/pull/19529 @gatorsmile sounds good, giving that a try now... assuming tests pass, I'll check it in and see if it's any better. I've so far done this for PlanTest and SQLTestUtils PlanTest I suspect it will make much cleaner. In SQLTestUtils I suspect it won't help as much, as it was more a pick-and-choose (this function goes in base, this doesn't) I haven't done it at all for SharedSQLContext/SharedSparkSession... that one seems more deserving of a first-level place to me, so I'm more hesitant to, but if you want, I'll do that one too. I suspect the correct answer is going to be redoing the PR, with careful commits that are clearer about what each does... but I'll try this first anyway :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19272#discussion_r146434552 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) ) +// check that the credentials are defined, even though it's likely that auth would have failed +// already if you've made it this far +if (principal != null && hadoopDelegationCreds.isDefined) { + logDebug(s"Principal found ($principal) starting token renewer") + val credentialRenewerThread = new Thread { +setName("MesosCredentialRenewer") +override def run(): Unit = { + val rt = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf) + val credentialRenewer = +new MesosCredentialRenewer( + conf, + hadoopDelegationTokenManager.get, + MesosCredentialRenewer.getNextRenewalTime(rt), + driverEndpoint) + credentialRenewer.scheduleTokenRenewal() +} + } + + credentialRenewerThread.start() + credentialRenewerThread.join() --- End diff -- That's the gist of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146434438 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19562 Thank you! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/19459 Merged in PR from @ueshin and added case for when schema is a string single datatype. In addition using a `StructType`, now this handles specifying the schema with the following: ``` spark.createDataFrame(pdf, ['name', 'age']) spark.createDataFrame(pdf, "a: string, b: int") spark.createDataFrame(pdf, "int") spark.createDataFrame(pdf, "struct") ``` @viirya brought up a good point here https://github.com/apache/spark/pull/19459#discussion_r145862488 (linking because it's outdated and hidden) - which shows another good reason to upgrade Arrow, I think --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r146431078 --- Diff: python/pyspark/sql/session.py --- @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _createFromPandasWithArrow(self, pdf, schema): +""" +Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting +to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the +data types will be used to coerce the data in Pandas to Arrow conversion. +""" +from pyspark.serializers import ArrowSerializer +from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type +import pyarrow as pa + +# Slice the DataFrame into batches +step = -(-len(pdf) // self.sparkContext.defaultParallelism) # round int up +pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step)) + +if schema is None or isinstance(schema, list): +batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False) + for pdf_slice in pdf_slices] + +# There will be at least 1 batch after slicing the pandas.DataFrame +schema_from_arrow = from_arrow_schema(batches[0].schema) + +# If passed schema as a list of names then rename fields +if isinstance(schema, list): +fields = [] +for i, field in enumerate(schema_from_arrow): +field.name = schema[i] +fields.append(field) +schema = StructType(fields) +else: +schema = schema_from_arrow +else: +batches = [] +for i, pdf_slice in enumerate(pdf_slices): + +# convert to series to pyarrow.Arrays to use mask when creating Arrow batches +arrs = [] +names = [] +for c, (_, series) in enumerate(pdf_slice.iteritems()): +field = schema[c] +names.append(field.name) +t = to_arrow_type(field.dataType) +try: +# NOTE: casting is not necessary with Arrow >= 0.7 + arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t), + mask=series.isnull(), type=t)) +except ValueError as e: --- End diff -- yeah, there doesn't seem to be a way to guard against overflow with `astype` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19459 **[Test build #83001 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83001/testReport)** for PR 19459 at commit [`f421e2d`](https://github.com/apache/spark/commit/f421e2da1e97dfbc7c80b7ae724b6ea9a472b220). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146430483 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18125 **[Test build #82998 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82998/testReport)** for PR 18125 at commit [`a33ea0d`](https://github.com/apache/spark/commit/a33ea0d7601d7b14e50536e3c457847145e799ae). * This patch **fails MiMa tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18125 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82998/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18125 Build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146429823 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19527 **[Test build #83000 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83000/testReport)** for PR 19527 at commit [`adc4107`](https://github.com/apache/spark/commit/adc410770528c6c95a3c35de64548362c1b46643). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19556 cc @cloud-fan for review too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146429113 --- Diff: pom.xml --- @@ -2649,6 +2649,13 @@ + kubernetes + +resource-managers/kubernetes/core --- End diff -- That (keeping them separate) is actually pretty useful for SBT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146428556 --- Diff: pom.xml --- @@ -2649,6 +2649,13 @@ + kubernetes + +resource-managers/kubernetes/core --- End diff -- It's not absolutely necessary to have integration tests in a specific separate module. However, there are some nice organizational benefits we can get. For example, integration tests in the same module as the core code will need a specific package namespace that is omitted from the `test` phase and only executed in the `integrationTest` phase. Having a separate module means that the integration test pom can just make the `test` phase a no-op and integrationTest runs all tests in the `test` folder. (I don't know if Maven has a concept of a difference between `src/test/scala` and `src/integrationTest/scala`, which would help a lot.) It's also IMO easier to read the `pom.xml` of the integration test separately from the `pom.xml` of the Kubernetes core implementation. FWIW this is what we have in the integration test POM at the moment: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/integration-tests/pom.xml. (The minikube related things are going away with https://github.com/apache-spark-on-k8s/spark/pull/521). > And that's assuming that you really don't want to run them during unit tests. We definitely don't want to run these during unit tests - they are relatively expensive, require building Docker images, and require Minikube to be pre-installed on the given machine. Having them in at least the separate integration test phase makes these differences clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19527#discussion_r146428519 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala --- @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute._ +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, HasOutputCols} +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{col, lit, udf} +import org.apache.spark.sql.types.{DoubleType, NumericType, StructField, StructType} + +/** Private trait for params and common methods for OneHotEncoderEstimator and OneHotEncoderModel */ +private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid +with HasInputCols with HasOutputCols { + + /** + * Param for how to handle invalid data. + * Options are 'keep' (invalid data are ignored) or 'error' (throw an error). + * Default: "error" + * @group param + */ + @Since("2.3.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"How to handle invalid data " + +"Options are 'keep' (invalid data are ignored) or error (throw an error).", --- End diff -- I will change the wording. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19556 **[Test build #82999 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82999/testReport)** for PR 19556 at commit [`5d7efd1`](https://github.com/apache/spark/commit/5d7efd14c0baba3e3f41258fcf6dc44f2976450a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19562 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146427690 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18125 **[Test build #82998 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82998/testReport)** for PR 18125 at commit [`a33ea0d`](https://github.com/apache/spark/commit/a33ea0d7601d7b14e50536e3c457847145e799ae). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19562 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18125 @setjet Could you address the conflicts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18125 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146426881 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146426810 --- Diff: pom.xml --- @@ -2649,6 +2649,13 @@ + kubernetes + +resource-managers/kubernetes/core --- End diff -- Why are integration tests in a separate module? e.g. maven has an `integration-test` phase which is separate from the usual `test` phase used for unit tests. And that's assuming that you really don't want to run them during unit tests. Then all code could potentially live in the same module. > in the interest of keeping test code together That would mean keeping the test code in the same module as the core code, not in a separate module. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp suppo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18664 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp suppo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18664 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82997/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp suppo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18664 **[Test build #82997 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82997/testReport)** for PR 18664 at commit [`79bb93f`](https://github.com/apache/spark/commit/79bb93f36ad6a0096f59072c54097015e2099a73). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18477: [SPARK-21261][DOCS]SQL Regex document fix
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18477 @visaxin Could you address the comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146426271 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146426317 --- Diff: pom.xml --- @@ -2649,6 +2649,13 @@ + kubernetes + +resource-managers/kubernetes/core --- End diff -- @vanzin this isn't a multi-module project in the sense that the Kubernetes cluster manager and spark-submit implementation are split across multiple projects - but rather that there is a module for said cluster manager + spark-submit implementation, and then there are modules for integration testing said code. @foxish The Dockerfiles feel more like application code rather than static configuration but that might just be a matter of implementation. The structure of the `CMD` in the Dockerfiles is particular to what `spark-submit` will expect for example. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18527: [SPARK-21101][SQL] Catch IllegalStateException when CREA...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18527 ping @wangyum This sounds a reasonable fix. Could you resolve the conflicts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user foxish commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146426033 --- Diff: pom.xml --- @@ -2649,6 +2649,13 @@ + kubernetes + +resource-managers/kubernetes/core --- End diff -- Based on a discussion in last week's meeting with Shane Knapp from RISELab, we want to keep the integration tests as a sub-module here - in the interest of keeping test code together. We should have the additional parent pom to facilitate that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19556: [SPARK-22328][Core] ClosureCleaner should not mis...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19556#discussion_r146426012 --- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala --- @@ -91,6 +91,52 @@ private[spark] object ClosureCleaner extends Logging { (seen - obj.getClass).toList } + /** Initializes the accessed fields for outer classes and their super classes. */ + private def initAccessedFields( + accessedFields: Map[Class[_], Set[String]], + outerClasses: Seq[Class[_]]): Unit = { +for (cls <- outerClasses) { + accessedFields(cls) = Set.empty[String] + + var superClass = cls.getSuperclass() + while (superClass != null) { +accessedFields(superClass) = Set.empty[String] +superClass = superClass.getSuperclass() + } +} + } + + /** Sets accessed fields for given class in clone object based on given object. */ + private def setAccessedFields( + outerClass: Class[_], + clone: AnyRef, + obj: AnyRef, + accessedFields: Map[Class[_], Set[String]]): Unit = { +for (fieldName <- accessedFields(outerClass)) { + val field = outerClass.getDeclaredField(fieldName) + field.setAccessible(true) + val value = field.get(obj) + field.set(clone, value) +} + } + + /** Clones a given object and sets accessed fields in cloned object. */ + private def cloneAndSetFields( + parent: AnyRef, + obj: AnyRef, + outerClass: Class[_], + accessedFields: Map[Class[_], Set[String]]): AnyRef = { +val clone = instantiateClass(outerClass, parent) +setAccessedFields(outerClass, clone, obj, accessedFields) + +var superClass = outerClass.getSuperclass() +while (superClass != null) { --- End diff -- Thanks. Looks good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146425629 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146425503 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146425133 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146425100 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.util.ThreadUtils + +/** + * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to + * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL + * options for different components. + */ +private[spark] object SparkKubernetesClientFactory { + + def createKubernetesClient( + master: String, + namespace: Option[String], + kubernetesAuthConfPrefix: String, + sparkConf: SparkConf, + maybeServiceAccountToken: Option[File], + maybeServiceAccountCaCert: Option[File]): KubernetesClient = { +val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" +val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" +val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) --- End diff -- This lacks context from the `spark-submit` implementation that is not in this PR. We intend to have two different sets of authentication options for the Kubernetes API. The first is the credentials for creating a driver pod and all the Kubernetes resources that the application requires outside of executor pods. The second is a set of credentials that the driver can use to create executor pods. These options will have shared suffixes in the configuration keys but different prefixes. The reasoning for two sets of credentials is twofold: - The driver needs strictly fewer privileges than `spark-submit`, because the driver only creates + deletes pods but `spark-submit` needs to make pods and other Kubernetes resources. Two sets of credentials allows the driver to have an appropriately limited scope of API access. - Part of the credentials includes TLS certificates for accessing the Kubernetes API over HTTPs. A common environment is to have the Kubernetes API server be accessible from a proxy into the cluster from an outside location, but then the driver will access the API server from inside the cluster. A front door for the API server typically asks for a different certificate than the certificate one would present when accessing the API server internally. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19562 Thank you for review and approval, @gatorsmile ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17100: [SPARK-13947][SQL] The error message from using an inval...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17100 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82994/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17100: [SPARK-13947][SQL] The error message from using an inval...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17100 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17100: [SPARK-13947][SQL] The error message from using an inval...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17100 **[Test build #82994 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82994/testReport)** for PR 17100 at commit [`34753b5`](https://github.com/apache/spark/commit/34753b513323f5076edd4c5006983a9c9d3d97d7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18268: [SPARK-21054] [SQL] Reset Command support reset specific...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18268 ping @ericsahit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18833: [SPARK-21625][SQL] sqrt(negative number) should be null.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18833 Can we document this difference in https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146423831 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146423757 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r146423117 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse( + throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( + requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected var totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( + conf.get("spark.driver.host"), + conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval =
[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19562 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19562 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82996/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19562 **[Test build #82996 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82996/testReport)** for PR 19562 at commit [`430af34`](https://github.com/apache/spark/commit/430af34d6e6b27f4dd90fa758b9d54b2e8d7eb1f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r146421804 --- Diff: python/pyspark/sql/session.py --- @@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema +def _createFromPandasWithArrow(self, pdf, schema): +""" +Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting +to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the +data types will be used to coerce the data in Pandas to Arrow conversion. +""" +from pyspark.serializers import ArrowSerializer +from pyspark.sql.types import from_arrow_schema, to_arrow_type, _cast_pandas_series_type +import pyarrow as pa + +# Slice the DataFrame into batches +step = -(-len(pdf) // self.sparkContext.defaultParallelism) # round int up +pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step)) + +if schema is None or isinstance(schema, list): +batches = [pa.RecordBatch.from_pandas(pdf_slice, preserve_index=False) + for pdf_slice in pdf_slices] + +# There will be at least 1 batch after slicing the pandas.DataFrame +schema_from_arrow = from_arrow_schema(batches[0].schema) + +# If passed schema as a list of names then rename fields +if isinstance(schema, list): +fields = [] +for i, field in enumerate(schema_from_arrow): +field.name = schema[i] +fields.append(field) +schema = StructType(fields) +else: +schema = schema_from_arrow +else: +batches = [] +for i, pdf_slice in enumerate(pdf_slices): + +# convert to series to pyarrow.Arrays to use mask when creating Arrow batches +arrs = [] +names = [] +for c, (_, series) in enumerate(pdf_slice.iteritems()): +field = schema[c] +names.append(field.name) +t = to_arrow_type(field.dataType) +try: +# NOTE: casting is not necessary with Arrow >= 0.7 + arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t), --- End diff -- I think that is a problem with using `astype` which doesn't provide any checks afaik. This casting is better done in Arrow, but since we are currently stuck on 0.4.1 we need this workaround. Trying this out with the latest arrow would give the user a nice error: ``` >>> pa.Array.from_pandas(s, type=pa.int16()) [ 1, 2, 10001 ] >>> pa.Array.from_pandas(s, type=pa.int8()) Traceback (most recent call last): File "", line 1, in File "pyarrow/array.pxi", line 279, in pyarrow.lib.Array.from_pandas (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:25865) File "pyarrow/array.pxi", line 169, in pyarrow.lib.array (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24833) File "pyarrow/array.pxi", line 70, in pyarrow.lib._ndarray_to_array (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24083) File "pyarrow/error.pxi", line 77, in pyarrow.lib.check_status (/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:7876) pyarrow.lib.ArrowInvalid: Integer value out of bounds ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org