[GitHub] spark issue #19557: [SPARK-22281][SPARKR] Handle R method breaking signature...
Github user shivaram commented on the issue: https://github.com/apache/spark/pull/19557 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/19557 tested on windows, r-hub/r-devel --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19571 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83072/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19571 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19571 **[Test build #83072 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83072/testReport)** for PR 19571 at commit [`8d212f0`](https://github.com/apache/spark/commit/8d212f049ccd176e5d6800d620929eed20844415). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19556 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83069/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19556 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19556 **[Test build #83069 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83069/testReport)** for PR 19556 at commit [`e26d093`](https://github.com/apache/spark/commit/e26d093bd14c79f26903206104da6aa57a32d613). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `assert(currentClass != null, \"The outer class can't be null.\")` * ` assert(currentClass != null, \"The outer class can't be null.\")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19077 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19077 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83068/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19077 **[Test build #83068 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83068/testReport)** for PR 19077 at commit [`c312fde`](https://github.com/apache/spark/commit/c312fdecbd33eb038c56ce47c3b34753df551c8f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19122 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83075/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19122 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19122 **[Test build #83075 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83075/testReport)** for PR 19122 at commit [`8b3ef97`](https://github.com/apache/spark/commit/8b3ef976849cdca4628d0652427c14179de0d39c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/11205 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83067/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/11205 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/11205 **[Test build #83067 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83067/testReport)** for PR 11205 at commit [`59f9c15`](https://github.com/apache/spark/commit/59f9c156c3ad746f84f385bcf277685c9c329286). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19565 @akopich If you want to cache the input dataset, create JIAR to discuss it first. It's another issue I think. This JIAR also related to input caching issues: https://issues.apache.org/jira/browse/SPARK-19422 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19559 **[Test build #83076 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83076/testReport)** for PR 19559 at commit [`2d42abf`](https://github.com/apache/spark/commit/2d42abf32d53a1f753bcdff367d476ac0638ccd5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, ...
Github user DonnyZone commented on a diff in the pull request: https://github.com/apache/spark/pull/19559#discussion_r147041980 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -139,6 +139,7 @@ class Analyzer( ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: + ResolveLiteralFunctions :: --- End diff -- Agree! I will refactor it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19559 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19559 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83071/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19559 **[Test build #83071 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83071/testReport)** for PR 19559 at commit [`d485e25`](https://github.com/apache/spark/commit/d485e25649ca90558b7639629be920274876e27c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19529 LGTM pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19122 **[Test build #83075 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83075/testReport)** for PR 19122 at commit [`8b3ef97`](https://github.com/apache/spark/commit/8b3ef976849cdca4628d0652427c14179de0d39c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19529 **[Test build #83074 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83074/testReport)** for PR 19529 at commit [`6c0b0d5`](https://github.com/apache/spark/commit/6c0b0d569ae1d779fd9253da0c7e97d12634063c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19529: [SPARK-22308] Support alternative unit testing st...
Github user nkronenfeld commented on a diff in the pull request: https://github.com/apache/spark/pull/19529#discussion_r147038647 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala --- @@ -17,86 +17,8 @@ package org.apache.spark.sql.test -import scala.concurrent.duration._ - -import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{DebugFilesystem, SparkConf} -import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.SQLConf - -/** - * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. - */ -trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually { - - protected def sparkConf = { -new SparkConf() - .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) - .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") - } - - /** - * The [[TestSparkSession]] to use for all tests in this suite. - * - * By default, the underlying [[org.apache.spark.SparkContext]] will be run in local - * mode with the default test configurations. - */ - private var _spark: TestSparkSession = null - - /** - * The [[TestSparkSession]] to use for all tests in this suite. - */ - protected implicit def spark: SparkSession = _spark - - /** - * The [[TestSQLContext]] to use for all tests in this suite. - */ - protected implicit def sqlContext: SQLContext = _spark.sqlContext - - protected def createSparkSession: TestSparkSession = { -new TestSparkSession(sparkConf) - } - - /** - * Initialize the [[TestSparkSession]]. - */ +trait SharedSQLContext extends SQLTestUtils with SharedSparkSession { --- End diff -- We could... that would more fit the pattern of what we've done now for PlanTest/PlanTestBase and SQLTestUtils/SQLTestUtilsBase. I hesitated in this case just because the two are such conceptually different concepts, and the idea is that both would actually get used - SharedSQLContext in internal tests, SharedSparkSession in external tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19529: [SPARK-22308] Support alternative unit testing st...
Github user nkronenfeld commented on a diff in the pull request: https://github.com/apache/spark/pull/19529#discussion_r147038487 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala --- @@ -29,7 +31,14 @@ import org.apache.spark.sql.internal.SQLConf /** * Provides helper methods for comparing plans. */ -trait PlanTest extends SparkFunSuite with PredicateHelper { +trait PlanTest extends SparkFunSuite with PlanTestBase { +} --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19529: [SPARK-22308] Support alternative unit testing st...
Github user nkronenfeld commented on a diff in the pull request: https://github.com/apache/spark/pull/19529#discussion_r147038504 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala --- @@ -17,86 +17,8 @@ package org.apache.spark.sql.test -import scala.concurrent.duration._ - -import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{DebugFilesystem, SparkConf} -import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.internal.SQLConf - -/** - * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. - */ -trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually { - - protected def sparkConf = { -new SparkConf() - .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) - .set("spark.unsafe.exceptionOnMemoryLeak", "true") - .set(SQLConf.CODEGEN_FALLBACK.key, "false") - } - - /** - * The [[TestSparkSession]] to use for all tests in this suite. - * - * By default, the underlying [[org.apache.spark.SparkContext]] will be run in local - * mode with the default test configurations. - */ - private var _spark: TestSparkSession = null - - /** - * The [[TestSparkSession]] to use for all tests in this suite. - */ - protected implicit def spark: SparkSession = _spark - - /** - * The [[TestSQLContext]] to use for all tests in this suite. - */ - protected implicit def sqlContext: SQLContext = _spark.sqlContext - - protected def createSparkSession: TestSparkSession = { -new TestSparkSession(sparkConf) - } - - /** - * Initialize the [[TestSparkSession]]. - */ +trait SharedSQLContext extends SQLTestUtils with SharedSparkSession { protected override def beforeAll(): Unit = { -SparkSession.sqlListener.set(null) -if (_spark == null) { - _spark = createSparkSession -} -// Ensure we have initialized the context before calling parent code super.beforeAll() --- End diff -- we don't. Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19433 > We'll actually only have to run an O(n log n) sort on continuous feature values once (i.e. in the FeatureVector constructor), since once the continuous features are sorted we can update them as we would for categorical features when splitting nodes (in O(n) time) and they'll remain sorted. Nice! so only one pass global sort, and then each split only need O(n) time copy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19579: [SPARK-22356][SQL] data source table should support over...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19579 **[Test build #83073 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83073/testReport)** for PR 19579 at commit [`18907cb`](https://github.com/apache/spark/commit/18907cb2359efb9b4e874482916de04af9cf90a2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19471 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19471 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83066/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19471 closing in favor of https://github.com/apache/spark/pull/19579 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19471 **[Test build #83066 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83066/testReport)** for PR 19471 at commit [`d21ebaa`](https://github.com/apache/spark/commit/d21ebaab6d6e2d7d6d10933d72360cef49194a90). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19471: [SPARK-22245][SQL] partitioned data set should al...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/19471 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19579: [SPARK-22356][SQL] data source table should support over...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19579 cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19579: [SPARK-22356][SQL] data source table should suppo...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/19579 [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema ## What changes were proposed in this pull request? This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. ## How was this patch tested? new regression test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark bug2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19579.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19579 commit 18907cb2359efb9b4e874482916de04af9cf90a2 Author: Wenchen FanDate: 2017-10-26T01:26:39Z overlapped columns between data and partition schema in data source tables --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19577#discussion_r147037025 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3102,7 +3103,12 @@ class Dataset[T] private[sql]( * Collect all elements from a spark plan. */ private def collectFromPlan(plan: SparkPlan): Array[T] = { -plan.executeCollect().map(boundEnc.fromRow) +val objProj = GenerateSafeProjection.generate(deserializer :: Nil) --- End diff -- Ok. Looks good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19433#discussion_r147036693 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala --- @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.tree.impl + +import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.tree.model.ImpurityStats + +/** Object exposing methods for local training of decision trees */ +private[ml] object LocalDecisionTree { + + /** + * Fully splits the passed-in node on the provided local dataset, returning + * an InternalNode/LeafNode corresponding to the root of the resulting tree. + * + * @param node LearningNode to use as the root of the subtree fit on the passed-in dataset + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = array of splits for feature i + */ + private[ml] def fitNode( + input: Array[TreePoint], + instanceWeights: Array[Double], + node: LearningNode, + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// The case with 1 node (depth = 0) is handled separately. +// This allows all iterations in the depth > 0 case to use the same code. +// TODO: Check that learning works when maxDepth > 0 but learning stops at 1 node (because of +// other parameters). +if (metadata.maxDepth == 0) { + return node.toNode +} + +// Prepare column store. +// Note: rowToColumnStoreDense checks to make sure numRows < Int.MaxValue. +val colStoreInit: Array[Array[Int]] += LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures)) +val labels = input.map(_.label) + +// Fit a regression model on the dataset, throwing an error if metadata indicates that +// we should train a classifier. +// TODO: Add support for training classifiers +if (metadata.numClasses > 1 && metadata.numClasses <= 32) { + throw new UnsupportedOperationException("Local training of a decision tree classifier is " + +"unsupported; currently, only regression is supported") +} else { + trainRegressor(node, colStoreInit, instanceWeights, labels, metadata, splits) +} + } + + /** + * Locally fits a decision tree regressor. + * TODO(smurching): Logic for fitting a classifier & regressor is the same; only difference + * is impurity metric. Use the same logic for fitting a classifier. + * + * @param rootNode Node to use as root of the tree fit on the passed-in dataset + * @param colStoreInit Array of columns of training data + * @param instanceWeights Array of weights for each training example + * @param metadata learning and dataset metadata for DecisionTree + * @param splits splits(i) = Array of possible splits for feature i + * @return LeafNode or InternalNode representation of rootNode + */ + private[ml] def trainRegressor( + rootNode: LearningNode, + colStoreInit: Array[Array[Int]], + instanceWeights: Array[Double], + labels: Array[Double], + metadata: DecisionTreeMetadata, + splits: Array[Array[Split]]): Node = { + +// Sort each column by decision tree node. +val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { case (col, featureIndex) => + val featureArity: Int = metadata.featureArity.getOrElse(featureIndex, 0) + FeatureVector(featureIndex, featureArity, col) +} + +val numRows = colStore.headOption match { + case None => 0 + case Some(column) => column.values.length +} + +// Create a new PartitionInfo describing the status of our partially-trained subtree +// at each iteration of training +var trainingInfo: TrainingInfo =
[GitHub] spark issue #18544: [SPARK-21318][SQL]Improve exception message thrown by `l...
Github user stanzhai commented on the issue: https://github.com/apache/spark/pull/18544 Hi @gatorsmile , I've added some test cases, and passed on my machine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19559#discussion_r147035765 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -139,6 +139,7 @@ class Analyzer( ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: + ResolveLiteralFunctions :: --- End diff -- The order matters. It assumes `ResolveReferences` should be run before this rule. However, `ResolveReferences` might need multiple passes to resolve all the references. Thus, how about moving the logics into `ResolveReferences `? If the attributes are not resolvable, we try to see whether it is a function literal? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19577#discussion_r147035745 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3102,7 +3103,12 @@ class Dataset[T] private[sql]( * Collect all elements from a spark plan. */ private def collectFromPlan(plan: SparkPlan): Array[T] = { -plan.executeCollect().map(boundEnc.fromRow) +val objProj = GenerateSafeProjection.generate(deserializer :: Nil) --- End diff -- it just rethrow the exception, not a big deal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19527 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19527 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83070/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19527 **[Test build #83070 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83070/testReport)** for PR 19527 at commit [`ae2ac82`](https://github.com/apache/spark/commit/ae2ac82b10e457b8beede9dc4a33ce0a578f007d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19577 Nice catch! LGTM with two minor comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19388: [SPARK-22162] Executors and the driver should use consis...
Github user rezasafi commented on the issue: https://github.com/apache/spark/pull/19388 Sorry for the delay. It seems that to be able to commit the same rdd in different stages we need to use stageId. So the jobId and other configurations in the write method of SparkHadoopWriter should be based on the stageId of the rdd and not the rddId. I have a hacky solution for this, but I am working on a better one and will update this PR ASAP. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19571 Thank you for review, @gatorsmile and @cloud-fan . Especially, @cloud-fan 's opinion is my original approach in #17980 and #18953 (before Aug 16). I cannot agree any more. > Basically we leave the old orc data source as it is, and implement a new orc 1.4.1 data source in sql core module. Then we have an internal config to switch the implementation(by default prefer the new implementation), and remove the old implementation after one or two releases. BTW, I'm wondering what is changed after you commented [the following](https://github.com/apache/spark/pull/18953#issuecomment-322827590) on that PR on 16th Aug. > Are the ORC APIs changed a lot in 1.4? I was expecting a small patch to upgrade the current ORC data source, without moving it to sql/core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19578: [SPARK-21983][SQL] Fix Antlr 4.7 deprecation warnings
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19578 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19578: [SPARK-21983][SQL] Fix Antlr 4.7 deprecation warnings
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19578 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83065/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19571 **[Test build #83072 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83072/testReport)** for PR 19571 at commit [`8d212f0`](https://github.com/apache/spark/commit/8d212f049ccd176e5d6800d620929eed20844415). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19577#discussion_r147032429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2661,7 +2657,12 @@ class Dataset[T] private[sql]( */ def toLocalIterator(): java.util.Iterator[T] = { withAction("toLocalIterator", queryExecution) { plan => - plan.executeToIterator().map(boundEnc.fromRow).asJava + val objProj = GenerateSafeProjection.generate(deserializer :: Nil) --- End diff -- It should be better to explain we keep the projection inside for thread-safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19578: [SPARK-21983][SQL] Fix Antlr 4.7 deprecation warnings
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19578 **[Test build #83065 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83065/testReport)** for PR 19578 at commit [`6c6b138`](https://github.com/apache/spark/commit/6c6b1385ead60d1998dac626ccf0a8f2ba203abd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19577#discussion_r147032280 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3102,7 +3103,12 @@ class Dataset[T] private[sql]( * Collect all elements from a spark plan. */ private def collectFromPlan(plan: SparkPlan): Array[T] = { -plan.executeCollect().map(boundEnc.fromRow) +val objProj = GenerateSafeProjection.generate(deserializer :: Nil) --- End diff -- `fromRow` has caught `RuntimeException`. Shall we also catch it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19534: [SPARK-22312][CORE] Fix bug in Executor allocation manag...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19534 @sitalkedia would you please reopen this PR, I think the second issue I fixed before is not valid anymore, for the first issue the fix is no difference compared to here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11205: [SPARK-11334][Core] Handle maximum task failure s...
Github user jerryshao closed the pull request at: https://github.com/apache/spark/pull/11205 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11205 Verified again, looks like the 2nd bullet is not valid anymore, I cannot reproduce it in latest master branch, this might have already been fixed in SPARK-13054. So only first issue still exists, I think @sitalkedia 's PR is enough to handle this 1st issue. I'm going to close this one. @sitalkedia would you please reopen your PR, sorry to bring in noise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19577 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83063/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19577 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19577 **[Test build #83063 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83063/testReport)** for PR 19577 at commit [`cecea8c`](https://github.com/apache/spark/commit/cecea8cdb36f3c5e65abd08643bd0d181d72008d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19559 **[Test build #83071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83071/testReport)** for PR 19559 at commit [`d485e25`](https://github.com/apache/spark/commit/d485e25649ca90558b7639629be920274876e27c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19556 @cloud-fan Two remaining do while loop are updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19527 **[Test build #83070 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83070/testReport)** for PR 19527 at commit [`ae2ac82`](https://github.com/apache/spark/commit/ae2ac82b10e457b8beede9dc4a33ce0a578f007d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19527 @huaxingao Good catch! Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19556 **[Test build #83069 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83069/testReport)** for PR 19556 at commit [`e26d093`](https://github.com/apache/spark/commit/e26d093bd14c79f26903206104da6aa57a32d613). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19077 **[Test build #83068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83068/testReport)** for PR 19077 at commit [`c312fde`](https://github.com/apache/spark/commit/c312fdecbd33eb038c56ce47c3b34753df551c8f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19531#discussion_r147027199 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala --- @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging { // scalastyle:off /** * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula: - * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of - * that column. The underlying assumption for this formula is: each value of the smaller domain - * is included in the larger domain. - * Generally, inner join with multiple join keys can also be estimated based on the above - * formula: + * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), + * where V is the number of distinct values (ndv) of that column. The underlying assumption for + * this formula is: each value of the smaller domain is included in the larger domain. + * + * Generally, inner join with multiple join keys can be estimated based on the above formula: * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn))) * However, the denominator can become very large and excessively reduce the result, so we use a * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator. + * + * That is, join estimation is based on the most selective join keys. We follow this strategy + * when different types of column statistics are available. E.g., if card1 is the cardinality + * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms + * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2). */ // scalastyle:on - def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = { -var ndvDenom: BigInt = -1 + private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]) +: BigInt = { +// If there's no column stats available for join keys, estimate as cartesian product. +var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get --- End diff -- It's equivalent. Previously if `ndvDenom` is -1 (no qualified join keys), the method returns 1 as selectivity, then the join cardinality computed outside the method is also `leftStats.rowCount.get * rightStats.rowCount.get`. Here I changed the method's returned values from selectivity to cardinality directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11205 @vanzin , in the current code `stageIdToTaskIndices` cannot be used to track number of running tasks, because this structure doesn't remove task index from itself when task is finished successfully. Yes `isExecutorIdle` is used to take care of executor idle, but the way to identify whether executor is idle is not robust enough. In this scenario, when stage is aborted because of max task failures, some task end event will be missing, so using number of tasks per executor will lead to residual data, and makes executor always be busy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/11205 **[Test build #83067 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83067/testReport)** for PR 11205 at commit [`59f9c15`](https://github.com/apache/spark/commit/59f9c156c3ad746f84f385bcf277685c9c329286). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147025026 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r147024180 --- Diff: python/pyspark/sql/types.py --- @@ -1619,11 +1619,38 @@ def to_arrow_type(dt): arrow_type = pa.decimal(dt.precision, dt.scale) elif type(dt) == StringType: arrow_type = pa.string() +elif type(dt) == DateType: +arrow_type = pa.date32() +elif type(dt) == TimestampType: +# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read +arrow_type = pa.timestamp('us', tz='UTC') else: raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) return arrow_type +def _check_dataframe_localize_timestamps(df): +""" Convert timezone aware timestamps to timezone-naive in local time +""" +from pandas.api.types import is_datetime64tz_dtype +for column, series in df.iteritems(): +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64tz_dtype(series.dtype): +df[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +return df + + +def _check_series_convert_timestamps_internal(s): +""" Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +""" +from pandas.api.types import is_datetime64_dtype +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64_dtype(s.dtype): +return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +else: +return s --- End diff -- Oh, got it. Yeah I think you're right.. I'll check on that a little later --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147024086 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r147023750 --- Diff: python/pyspark/sql/types.py --- @@ -1619,11 +1619,38 @@ def to_arrow_type(dt): arrow_type = pa.decimal(dt.precision, dt.scale) elif type(dt) == StringType: arrow_type = pa.string() +elif type(dt) == DateType: +arrow_type = pa.date32() +elif type(dt) == TimestampType: +# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read +arrow_type = pa.timestamp('us', tz='UTC') else: raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) return arrow_type +def _check_dataframe_localize_timestamps(df): +""" Convert timezone aware timestamps to timezone-naive in local time +""" +from pandas.api.types import is_datetime64tz_dtype +for column, series in df.iteritems(): +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64tz_dtype(series.dtype): +df[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +return df + + +def _check_series_convert_timestamps_internal(s): +""" Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +""" +from pandas.api.types import is_datetime64_dtype +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64_dtype(s.dtype): +return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +else: +return s --- End diff -- I meant if is_datetime64**tz**_dtype(s.dtype) but had the strange timezone like `tzlocal()`, I thought we need `s.dt.tz_convert('UTC')`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147022826 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147022413 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r147021983 --- Diff: python/pyspark/sql/types.py --- @@ -1619,11 +1619,38 @@ def to_arrow_type(dt): arrow_type = pa.decimal(dt.precision, dt.scale) elif type(dt) == StringType: arrow_type = pa.string() +elif type(dt) == DateType: +arrow_type = pa.date32() +elif type(dt) == TimestampType: +# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read +arrow_type = pa.timestamp('us', tz='UTC') else: raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) return arrow_type +def _check_dataframe_localize_timestamps(df): +""" Convert timezone aware timestamps to timezone-naive in local time +""" +from pandas.api.types import is_datetime64tz_dtype +for column, series in df.iteritems(): +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64tz_dtype(series.dtype): +df[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +return df + + +def _check_series_convert_timestamps_internal(s): +""" Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +""" +from pandas.api.types import is_datetime64_dtype +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64_dtype(s.dtype): +return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +else: +return s --- End diff -- This is to ensure that the values will be from unix epoch, which is what Spark expects to store internally. Just like `TimestampType.toInternal` [here](https://github.com/BryanCutler/spark/blob/4d4089330d451bf6a145c28a6f34407ce3138b4d/python/pyspark/sql/types.py#L190) - only this conversion is vectorized with Pandas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user akopich commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147021726 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- I believe, it's redundant now. Anyway, `submitMiniBatch` counts the documents. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19471 **[Test build #83066 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83066/testReport)** for PR 19471 at commit [`d21ebaa`](https://github.com/apache/spark/commit/d21ebaab6d6e2d7d6d10933d72360cef49194a90). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147020853 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this --- End diff -- We still need this, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...
Github user hhbyyh commented on a diff in the pull request: https://github.com/apache/spark/pull/19565#discussion_r147021004 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { override private[clustering] def next(): OnlineLDAOptimizer = { val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction, randomGenerator.nextLong()) -if (batch.isEmpty()) return this submitMiniBatch(batch) } /** * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA * model, and it will update the topic distribution adaptively for the terms appearing in the * subset. + * The methods assumes no empty documents are submitted. --- End diff -- Maybe add a require --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147021138 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r147021068 --- Diff: python/pyspark/serializers.py --- @@ -224,7 +225,13 @@ def _create_batch(series): # If a nullable integer series has been promoted to floating point with NaNs, need to cast # NOTE: this is not necessary with Arrow >= 0.7 def cast_series(s, t): -if t is None or s.dtype == t.to_pandas_dtype(): +if type(t) == pa.TimestampType: +# NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680 +return _series_convert_timestamps_internal(s).values.astype('datetime64[us]') --- End diff -- `apply()` will invoke the given function on each individual value of the series. I think this iterates over the series, where `s.dt.tz_localize()` would do a vectorized operation and should be faster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19468 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83062/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19468 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19468 **[Test build #83062 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83062/testReport)** for PR 19468 at commit [`a4f9797`](https://github.com/apache/spark/commit/a4f97976afff452d7d953b83b722da61dfb40c3b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...
Github user hhbyyh commented on the issue: https://github.com/apache/spark/pull/19565 I wonder if we should add cache() for lda training data, even not for this feature. @srowen Not sure where we're on caching the training data or not for different algorithms. Appreciate your advice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r147020254 --- Diff: python/pyspark/sql/types.py --- @@ -1619,11 +1619,38 @@ def to_arrow_type(dt): arrow_type = pa.decimal(dt.precision, dt.scale) elif type(dt) == StringType: arrow_type = pa.string() +elif type(dt) == DateType: +arrow_type = pa.date32() +elif type(dt) == TimestampType: +# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read +arrow_type = pa.timestamp('us', tz='UTC') else: raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) return arrow_type +def _check_dataframe_localize_timestamps(df): +""" Convert timezone aware timestamps to timezone-naive in local time +""" +from pandas.api.types import is_datetime64tz_dtype +for column, series in df.iteritems(): +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64tz_dtype(series.dtype): +df[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) +return df + + +def _check_series_convert_timestamps_internal(s): +""" Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage +""" +from pandas.api.types import is_datetime64_dtype +# TODO: handle nested timestamps, such as ArrayType(TimestampType())? +if is_datetime64_dtype(s.dtype): +return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') +else: +return s --- End diff -- We need `s.dt.tz_convert('UTC')` for the case the timezone is strange like `tzlocal()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147019793 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r147019262 --- Diff: python/pyspark/serializers.py --- @@ -224,7 +225,13 @@ def _create_batch(series): # If a nullable integer series has been promoted to floating point with NaNs, need to cast # NOTE: this is not necessary with Arrow >= 0.7 def cast_series(s, t): -if t is None or s.dtype == t.to_pandas_dtype(): +if type(t) == pa.TimestampType: +# NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680 +return _series_convert_timestamps_internal(s).values.astype('datetime64[us]') --- End diff -- @BryanCutler It seems `s.apply(lambda ts: ts.tz_localize('tzlocal()'))` works without `s.fillna(0)`. Do you know the difference between this and `s.dt.tz_localize('tzlocal()')`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147019065 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) +
[GitHub] spark issue #19576: [SPARK-19727][SQL][followup] Fix for round function that...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19576 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19576: [SPARK-19727][SQL][followup] Fix for round function that...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19576 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83059/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19576: [SPARK-19727][SQL][followup] Fix for round function that...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19576 **[Test build #83059 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83059/testReport)** for PR 19576 at commit [`cba65d1`](https://github.com/apache/spark/commit/cba65d1c72427c9e36eb097dcdf9c2420f66d5db). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147018411 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15770 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15770 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83064/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15770 **[Test build #83064 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83064/testReport)** for PR 15770 at commit [`752b685`](https://github.com/apache/spark/commit/752b685892c1dbdf69811504985640e59756f679). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147017517 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +/** + * Configures executor pods. Construct one of these with a SparkConf to set up properties that are + * common across all executors. Then, pass in dynamic parameters into createExecutorPod. + */ +private[spark] trait ExecutorPodFactory { + def createExecutorPod( + executorId: String, + applicationId: String, + driverUrl: String, + executorEnvs: Seq[(String, String)], + driverPod: Pod, + nodeToLocalTaskCount: Map[String, Int]): Pod +} + +private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) + extends ExecutorPodFactory { + + import ExecutorPodFactoryImpl._ + + private val executorExtraClasspath = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) + private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + + private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( +sparkConf, +KUBERNETES_EXECUTOR_LABEL_PREFIX, +"executor label") + require( +!executorLabels.contains(SPARK_APP_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + require( +!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), +s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + + s" Spark.") + + private val executorAnnotations = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, + "executor annotation") + private val nodeSelector = +ConfigurationUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node selector") + + private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE) + private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) + private val blockmanagerPort = sparkConf +.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + private val kubernetesDriverPodName = sparkConf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) + + private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = sparkConf.get( +org.apache.spark.internal.config.EXECUTOR_MEMORY.key, +org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) + + private val memoryOverheadMiB = sparkConf +.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + + private val executorCores = sparkConf.getDouble("spark.executor.cores", 1d) + private val executorLimitCores = sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) + + override def createExecutorPod( + executorId: String,
[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19468#discussion_r147017011 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Closeable +import java.net.InetAddress +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, Future} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils + +private[spark] class KubernetesClusterSchedulerBackend( +scheduler: TaskSchedulerImpl, +rpcEnv: RpcEnv, +executorPodFactory: ExecutorPodFactory, +kubernetesClient: KubernetesClient, +allocatorExecutor: ScheduledExecutorService, +requestExecutorsService: ExecutorService) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + + import KubernetesClusterSchedulerBackend._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val RUNNING_EXECUTOR_PODS_LOCK = new Object + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]() + private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]() + private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]() + + private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( +requestExecutorsService) + + private val driverPod = try { +kubernetesClient.pods() + .inNamespace(kubernetesNamespace) + .withName(kubernetesDriverPodName) + .get() + } catch { +case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + + override val minRegisteredRatio = +if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + 0.8 +} else { + super.minRegisteredRatio +} + + private val executorWatchResource = new AtomicReference[Closeable] + protected val totalExpectedExecutors = new AtomicInteger(0) + + private val driverUrl = RpcEndpointAddress( +conf.get("spark.driver.host"), +conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), +CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + private val initialExecutors = getInitialTargetExecutorNumber() + + private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) +