[GitHub] spark issue #19557: [SPARK-22281][SPARKR] Handle R method breaking signature...

2017-10-25 Thread shivaram
Github user shivaram commented on the issue:

https://github.com/apache/spark/pull/19557
  
LGTM. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-25 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/19557
  
tested on windows, r-hub/r-devel


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19571
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83072/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19571
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19571
  
**[Test build #83072 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83072/testReport)**
 for PR 19571 at commit 
[`8d212f0`](https://github.com/apache/spark/commit/8d212f049ccd176e5d6800d620929eed20844415).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19556
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83069/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19556
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19556
  
**[Test build #83069 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83069/testReport)**
 for PR 19556 at commit 
[`e26d093`](https://github.com/apache/spark/commit/e26d093bd14c79f26903206104da6aa57a32d613).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `assert(currentClass != null, \"The outer class can't be null.\")`
  * `  assert(currentClass != null, \"The outer class can't be 
null.\")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19077
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19077
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83068/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19077
  
**[Test build #83068 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83068/testReport)**
 for PR 19077 at commit 
[`c312fde`](https://github.com/apache/spark/commit/c312fdecbd33eb038c56ce47c3b34753df551c8f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19122
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83075/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19122
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19122
  
**[Test build #83075 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83075/testReport)**
 for PR 19122 at commit 
[`8b3ef97`](https://github.com/apache/spark/commit/8b3ef976849cdca4628d0652427c14179de0d39c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/11205
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83067/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/11205
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/11205
  
**[Test build #83067 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83067/testReport)**
 for PR 11205 at commit 
[`59f9c15`](https://github.com/apache/spark/commit/59f9c156c3ad746f84f385bcf277685c9c329286).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...

2017-10-25 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19565
  
@akopich If you want to cache the input dataset, create JIAR to discuss it 
first. It's another issue I think. This JIAR also related to input caching 
issues: https://issues.apache.org/jira/browse/SPARK-19422


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19559
  
**[Test build #83076 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83076/testReport)**
 for PR 19559 at commit 
[`2d42abf`](https://github.com/apache/spark/commit/2d42abf32d53a1f753bcdff367d476ac0638ccd5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, ...

2017-10-25 Thread DonnyZone
Github user DonnyZone commented on a diff in the pull request:

https://github.com/apache/spark/pull/19559#discussion_r147041980
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -139,6 +139,7 @@ class Analyzer(
   ExtractGenerator ::
   ResolveGenerate ::
   ResolveFunctions ::
+  ResolveLiteralFunctions ::
--- End diff --

Agree! I will refactor it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19559
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19559
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83071/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19559
  
**[Test build #83071 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83071/testReport)**
 for PR 19559 at commit 
[`d485e25`](https://github.com/apache/spark/commit/d485e25649ca90558b7639629be920274876e27c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...

2017-10-25 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19529
  
LGTM pending Jenkins


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluation for...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19122
  
**[Test build #83075 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83075/testReport)**
 for PR 19122 at commit 
[`8b3ef97`](https://github.com/apache/spark/commit/8b3ef976849cdca4628d0652427c14179de0d39c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19529
  
**[Test build #83074 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83074/testReport)**
 for PR 19529 at commit 
[`6c0b0d5`](https://github.com/apache/spark/commit/6c0b0d569ae1d779fd9253da0c7e97d12634063c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19529: [SPARK-22308] Support alternative unit testing st...

2017-10-25 Thread nkronenfeld
Github user nkronenfeld commented on a diff in the pull request:

https://github.com/apache/spark/pull/19529#discussion_r147038647
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala ---
@@ -17,86 +17,8 @@
 
 package org.apache.spark.sql.test
 
-import scala.concurrent.duration._
-
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{DebugFilesystem, SparkConf}
-import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Helper trait for SQL test suites where all tests share a single 
[[TestSparkSession]].
- */
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with 
Eventually {
-
-  protected def sparkConf = {
-new SparkConf()
-  .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
-  .set("spark.unsafe.exceptionOnMemoryLeak", "true")
-  .set(SQLConf.CODEGEN_FALLBACK.key, "false")
-  }
-
-  /**
-   * The [[TestSparkSession]] to use for all tests in this suite.
-   *
-   * By default, the underlying [[org.apache.spark.SparkContext]] will be 
run in local
-   * mode with the default test configurations.
-   */
-  private var _spark: TestSparkSession = null
-
-  /**
-   * The [[TestSparkSession]] to use for all tests in this suite.
-   */
-  protected implicit def spark: SparkSession = _spark
-
-  /**
-   * The [[TestSQLContext]] to use for all tests in this suite.
-   */
-  protected implicit def sqlContext: SQLContext = _spark.sqlContext
-
-  protected def createSparkSession: TestSparkSession = {
-new TestSparkSession(sparkConf)
-  }
-
-  /**
-   * Initialize the [[TestSparkSession]].
-   */
+trait SharedSQLContext extends SQLTestUtils with SharedSparkSession {
--- End diff --

We could... that would more fit the pattern of what we've done now for 
PlanTest/PlanTestBase and SQLTestUtils/SQLTestUtilsBase.

I hesitated in this case just because the two are such conceptually 
different concepts, and the idea is that both would actually get used - 
SharedSQLContext in internal tests, SharedSparkSession in external tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19529: [SPARK-22308] Support alternative unit testing st...

2017-10-25 Thread nkronenfeld
Github user nkronenfeld commented on a diff in the pull request:

https://github.com/apache/spark/pull/19529#discussion_r147038487
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala 
---
@@ -29,7 +31,14 @@ import org.apache.spark.sql.internal.SQLConf
 /**
  * Provides helper methods for comparing plans.
  */
-trait PlanTest extends SparkFunSuite with PredicateHelper {
+trait PlanTest extends SparkFunSuite with PlanTestBase {
+}
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19529: [SPARK-22308] Support alternative unit testing st...

2017-10-25 Thread nkronenfeld
Github user nkronenfeld commented on a diff in the pull request:

https://github.com/apache/spark/pull/19529#discussion_r147038504
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala ---
@@ -17,86 +17,8 @@
 
 package org.apache.spark.sql.test
 
-import scala.concurrent.duration._
-
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{DebugFilesystem, SparkConf}
-import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * Helper trait for SQL test suites where all tests share a single 
[[TestSparkSession]].
- */
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with 
Eventually {
-
-  protected def sparkConf = {
-new SparkConf()
-  .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
-  .set("spark.unsafe.exceptionOnMemoryLeak", "true")
-  .set(SQLConf.CODEGEN_FALLBACK.key, "false")
-  }
-
-  /**
-   * The [[TestSparkSession]] to use for all tests in this suite.
-   *
-   * By default, the underlying [[org.apache.spark.SparkContext]] will be 
run in local
-   * mode with the default test configurations.
-   */
-  private var _spark: TestSparkSession = null
-
-  /**
-   * The [[TestSparkSession]] to use for all tests in this suite.
-   */
-  protected implicit def spark: SparkSession = _spark
-
-  /**
-   * The [[TestSQLContext]] to use for all tests in this suite.
-   */
-  protected implicit def sqlContext: SQLContext = _spark.sqlContext
-
-  protected def createSparkSession: TestSparkSession = {
-new TestSparkSession(sparkConf)
-  }
-
-  /**
-   * Initialize the [[TestSparkSession]].
-   */
+trait SharedSQLContext extends SQLTestUtils with SharedSparkSession {
   protected override def beforeAll(): Unit = {
-SparkSession.sqlListener.set(null)
-if (_spark == null) {
-  _spark = createSparkSession
-}
-// Ensure we have initialized the context before calling parent code
 super.beforeAll()
--- End diff --

we don't.  Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19433: [SPARK-3162] [MLlib] Add local tree training for decisio...

2017-10-25 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19433
  
> We'll actually only have to run an O(n log n) sort on continuous feature 
values once (i.e. in the FeatureVector constructor), since once the continuous 
features are sorted we can update them as we would for categorical features 
when splitting nodes (in O(n) time) and they'll remain sorted.

Nice! so only one pass global sort, and then each split only need O(n) time 
copy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19579: [SPARK-22356][SQL] data source table should support over...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19579
  
**[Test build #83073 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83073/testReport)**
 for PR 19579 at commit 
[`18907cb`](https://github.com/apache/spark/commit/18907cb2359efb9b4e874482916de04af9cf90a2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19471
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19471
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83066/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...

2017-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19471
  
closing in favor of https://github.com/apache/spark/pull/19579


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19471
  
**[Test build #83066 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83066/testReport)**
 for PR 19471 at commit 
[`d21ebaa`](https://github.com/apache/spark/commit/d21ebaab6d6e2d7d6d10933d72360cef49194a90).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19471: [SPARK-22245][SQL] partitioned data set should al...

2017-10-25 Thread cloud-fan
Github user cloud-fan closed the pull request at:

https://github.com/apache/spark/pull/19471


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19579: [SPARK-22356][SQL] data source table should support over...

2017-10-25 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19579
  
cc @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19579: [SPARK-22356][SQL] data source table should suppo...

2017-10-25 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/19579

[SPARK-22356][SQL] data source table should support overlapped columns 
between data and partition schema

## What changes were proposed in this pull request?

This is a regression introduced by #14207. After Spark 2.1, we store the 
inferred schema when creating the table, to avoid inferring schema again at 
read path. However, there is one special case: overlapped columns between data 
and partition. For this case, it breaks the assumption of table schema that 
there is on ovelap between data and partition schema, and partition columns 
should be at the end. The result is, for Spark 2.1, the table scan has 
incorrect schema that puts partition columns at the end. For Spark 2.2, we add 
a check in CatalogTable to validate table schema, which fails at this case.

To fix this issue, a simple and safe approach is to fallback to old 
behavior when overlapeed columns detected, i.e. store empty schema in metastore.

## How was this patch tested?

new regression test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark bug2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19579.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19579


commit 18907cb2359efb9b4e874482916de04af9cf90a2
Author: Wenchen Fan 
Date:   2017-10-26T01:26:39Z

overlapped columns between data and partition schema in data source tables




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...

2017-10-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19577#discussion_r147037025
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3102,7 +3103,12 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
   private def collectFromPlan(plan: SparkPlan): Array[T] = {
-plan.executeCollect().map(boundEnc.fromRow)
+val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
--- End diff --

Ok. Looks good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19433: [SPARK-3162] [MLlib] Add local tree training for ...

2017-10-25 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19433#discussion_r147036693
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/LocalDecisionTree.scala ---
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.tree.impl
+
+import org.apache.spark.ml.tree._
+import org.apache.spark.mllib.tree.model.ImpurityStats
+
+/** Object exposing methods for local training of decision trees */
+private[ml] object LocalDecisionTree {
+
+  /**
+   * Fully splits the passed-in node on the provided local dataset, 
returning
+   * an InternalNode/LeafNode corresponding to the root of the resulting 
tree.
+   *
+   * @param node LearningNode to use as the root of the subtree fit on the 
passed-in dataset
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = array of splits for feature i
+   */
+  private[ml] def fitNode(
+  input: Array[TreePoint],
+  instanceWeights: Array[Double],
+  node: LearningNode,
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// The case with 1 node (depth = 0) is handled separately.
+// This allows all iterations in the depth > 0 case to use the same 
code.
+// TODO: Check that learning works when maxDepth > 0 but learning 
stops at 1 node (because of
+//   other parameters).
+if (metadata.maxDepth == 0) {
+  return node.toNode
+}
+
+// Prepare column store.
+//   Note: rowToColumnStoreDense checks to make sure numRows < 
Int.MaxValue.
+val colStoreInit: Array[Array[Int]]
+= 
LocalDecisionTreeUtils.rowToColumnStoreDense(input.map(_.binnedFeatures))
+val labels = input.map(_.label)
+
+// Fit a regression model on the dataset, throwing an error if 
metadata indicates that
+// we should train a classifier.
+// TODO: Add support for training classifiers
+if (metadata.numClasses > 1 && metadata.numClasses <= 32) {
+  throw new UnsupportedOperationException("Local training of a 
decision tree classifier is " +
+"unsupported; currently, only regression is supported")
+} else {
+  trainRegressor(node, colStoreInit, instanceWeights, labels, 
metadata, splits)
+}
+  }
+
+  /**
+   * Locally fits a decision tree regressor.
+   * TODO(smurching): Logic for fitting a classifier & regressor is the 
same; only difference
+   * is impurity metric. Use the same logic for fitting a classifier.
+   *
+   * @param rootNode Node to use as root of the tree fit on the passed-in 
dataset
+   * @param colStoreInit Array of columns of training data
+   * @param instanceWeights Array of weights for each training example
+   * @param metadata learning and dataset metadata for DecisionTree
+   * @param splits splits(i) = Array of possible splits for feature i
+   * @return LeafNode or InternalNode representation of rootNode
+   */
+  private[ml] def trainRegressor(
+  rootNode: LearningNode,
+  colStoreInit: Array[Array[Int]],
+  instanceWeights: Array[Double],
+  labels: Array[Double],
+  metadata: DecisionTreeMetadata,
+  splits: Array[Array[Split]]): Node = {
+
+// Sort each column by decision tree node.
+val colStore: Array[FeatureVector] = colStoreInit.zipWithIndex.map { 
case (col, featureIndex) =>
+  val featureArity: Int = 
metadata.featureArity.getOrElse(featureIndex, 0)
+  FeatureVector(featureIndex, featureArity, col)
+}
+
+val numRows = colStore.headOption match {
+  case None => 0
+  case Some(column) => column.values.length
+}
+
+// Create a new PartitionInfo describing the status of our 
partially-trained subtree
+// at each iteration of training
+var trainingInfo: TrainingInfo = 

[GitHub] spark issue #18544: [SPARK-21318][SQL]Improve exception message thrown by `l...

2017-10-25 Thread stanzhai
Github user stanzhai commented on the issue:

https://github.com/apache/spark/pull/18544
  
Hi @gatorsmile , I've added some test cases, and passed on my machine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, ...

2017-10-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19559#discussion_r147035765
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -139,6 +139,7 @@ class Analyzer(
   ExtractGenerator ::
   ResolveGenerate ::
   ResolveFunctions ::
+  ResolveLiteralFunctions ::
--- End diff --

The order matters. It assumes `ResolveReferences` should be run before this 
rule. However, `ResolveReferences` might need multiple passes to resolve all 
the references. Thus, how about moving the logics into `ResolveReferences `? If 
the attributes are not resolvable, we try to see whether it is a function 
literal?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...

2017-10-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19577#discussion_r147035745
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3102,7 +3103,12 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
   private def collectFromPlan(plan: SparkPlan): Array[T] = {
-plan.executeCollect().map(boundEnc.fromRow)
+val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
--- End diff --

it just rethrow the exception, not a big deal


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19527
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19527
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83070/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19527
  
**[Test build #83070 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83070/testReport)**
 for PR 19527 at commit 
[`ae2ac82`](https://github.com/apache/spark/commit/ae2ac82b10e457b8beede9dc4a33ce0a578f007d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe

2017-10-25 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19577
  
Nice catch! LGTM with two minor comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19388: [SPARK-22162] Executors and the driver should use consis...

2017-10-25 Thread rezasafi
Github user rezasafi commented on the issue:

https://github.com/apache/spark/pull/19388
  
Sorry for the delay. It seems that to be able to commit the same rdd in 
different stages we need to use stageId. So the jobId and other configurations 
in the write method of SparkHadoopWriter should be based on the stageId of the 
rdd and not the rddId. I have a hacky solution for this, but I am working on a 
better one and will update this PR ASAP.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...

2017-10-25 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19571
  
Thank you for review, @gatorsmile and @cloud-fan . Especially, @cloud-fan 
's opinion is my original approach in #17980 and #18953 (before Aug 16). I 
cannot agree any more.

> Basically we leave the old orc data source as it is, and implement a new 
orc 1.4.1 data source in sql core module. Then we have an internal config to 
switch the implementation(by default prefer the new implementation), and remove 
the old implementation after one or two releases.

BTW, I'm wondering what is changed after you commented [the 
following](https://github.com/apache/spark/pull/18953#issuecomment-322827590) 
on that PR on 16th Aug.

> Are the ORC APIs changed a lot in 1.4? I was expecting a small patch to 
upgrade the current ORC data source, without moving it to sql/core.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19578: [SPARK-21983][SQL] Fix Antlr 4.7 deprecation warnings

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19578
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19578: [SPARK-21983][SQL] Fix Antlr 4.7 deprecation warnings

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19578
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83065/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19571: [SPARK-15474][SQL] Write and read back non-emtpy schema ...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19571
  
**[Test build #83072 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83072/testReport)**
 for PR 19571 at commit 
[`8d212f0`](https://github.com/apache/spark/commit/8d212f049ccd176e5d6800d620929eed20844415).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...

2017-10-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19577#discussion_r147032429
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2661,7 +2657,12 @@ class Dataset[T] private[sql](
*/
   def toLocalIterator(): java.util.Iterator[T] = {
 withAction("toLocalIterator", queryExecution) { plan =>
-  plan.executeToIterator().map(boundEnc.fromRow).asJava
+  val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
--- End diff --

It should be better to explain we keep the projection inside for 
thread-safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19578: [SPARK-21983][SQL] Fix Antlr 4.7 deprecation warnings

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19578
  
**[Test build #83065 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83065/testReport)**
 for PR 19578 at commit 
[`6c6b138`](https://github.com/apache/spark/commit/6c6b1385ead60d1998dac626ccf0a8f2ba203abd).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19577: [SPARK-22355][SQL] Dataset.collect is not threads...

2017-10-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19577#discussion_r147032280
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3102,7 +3103,12 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
   private def collectFromPlan(plan: SparkPlan): Array[T] = {
-plan.executeCollect().map(boundEnc.fromRow)
+val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
--- End diff --

`fromRow` has caught `RuntimeException`. Shall we also catch it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19534: [SPARK-22312][CORE] Fix bug in Executor allocation manag...

2017-10-25 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19534
  
@sitalkedia would you please reopen this PR, I think the second issue I 
fixed before is not valid anymore, for the first issue the fix is no difference 
compared to here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11205: [SPARK-11334][Core] Handle maximum task failure s...

2017-10-25 Thread jerryshao
Github user jerryshao closed the pull request at:

https://github.com/apache/spark/pull/11205


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...

2017-10-25 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11205
  
Verified again, looks like the 2nd bullet is not valid anymore, I cannot 
reproduce it in latest master branch, this might have already been fixed in 
SPARK-13054. 

So only first issue still exists, I think @sitalkedia 's PR is enough to 
handle this 1st issue. I'm going to close this one. @sitalkedia would you 
please reopen your PR, sorry to bring in noise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19577
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83063/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19577
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19577: [SPARK-22355][SQL] Dataset.collect is not threadsafe

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19577
  
**[Test build #83063 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83063/testReport)**
 for PR 19577 at commit 
[`cecea8c`](https://github.com/apache/spark/commit/cecea8cdb36f3c5e65abd08643bd0d181d72008d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19559: [SPARK-22333][SQL]timeFunctionCall(CURRENT_DATE, CURRENT...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19559
  
**[Test build #83071 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83071/testReport)**
 for PR 19559 at commit 
[`d485e25`](https://github.com/apache/spark/commit/d485e25649ca90558b7639629be920274876e27c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-25 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19556
  
@cloud-fan Two remaining do while loop are updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19527
  
**[Test build #83070 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83070/testReport)**
 for PR 19527 at commit 
[`ae2ac82`](https://github.com/apache/spark/commit/ae2ac82b10e457b8beede9dc4a33ce0a578f007d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-25 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19527
  
@huaxingao Good catch! Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19556
  
**[Test build #83069 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83069/testReport)**
 for PR 19556 at commit 
[`e26d093`](https://github.com/apache/spark/commit/e26d093bd14c79f26903206104da6aa57a32d613).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19077: [SPARK-21860][core]Improve memory reuse for heap memory ...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19077
  
**[Test build #83068 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83068/testReport)**
 for PR 19077 at commit 
[`c312fde`](https://github.com/apache/spark/commit/c312fdecbd33eb038c56ce47c3b34753df551c8f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

2017-10-25 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19531#discussion_r147027199
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends 
Logging {
   // scalastyle:off
   /**
* The number of rows of A inner join B on A.k1 = B.k1 is estimated by 
this basic formula:
-   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the 
number of distinct values of
-   * that column. The underlying assumption for this formula is: each 
value of the smaller domain
-   * is included in the larger domain.
-   * Generally, inner join with multiple join keys can also be estimated 
based on the above
-   * formula:
+   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
+   * where V is the number of distinct values (ndv) of that column. The 
underlying assumption for
+   * this formula is: each value of the smaller domain is included in the 
larger domain.
+   *
+   * Generally, inner join with multiple join keys can be estimated based 
on the above formula:
* T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), 
V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
* However, the denominator can become very large and excessively reduce 
the result, so we use a
* conservative strategy to take only the largest max(V(A.ki), V(B.ki)) 
as the denominator.
+   *
+   * That is, join estimation is based on the most selective join keys. We 
follow this strategy
+   * when different types of column statistics are available. E.g., if 
card1 is the cardinality
+   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality 
estimated by histograms
+   * of join key A.k2 and B.k2, then the result cardinality would be 
min(card1, card2).
*/
   // scalastyle:on
-  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, 
AttributeReference)]): BigDecimal = {
-var ndvDenom: BigInt = -1
+  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, 
AttributeReference)])
+: BigInt = {
+// If there's no column stats available for join keys, estimate as 
cartesian product.
+var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
--- End diff --

It's equivalent. Previously if `ndvDenom` is -1 (no qualified join keys), 
the method returns 1 as selectivity, then the join cardinality computed outside 
the method is also `leftStats.rowCount.get * rightStats.rowCount.get`. Here I 
changed the method's returned values from selectivity to cardinality directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...

2017-10-25 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11205
  
@vanzin ,  in the current code `stageIdToTaskIndices` cannot be used to 
track number of running tasks, because this structure doesn't remove task index 
from itself when task is finished successfully.

Yes `isExecutorIdle` is used to take care of executor idle, but the way to 
identify whether executor is idle is not robust enough. In this scenario, when 
stage is aborted because of max task failures, some task end event will be 
missing, so using number of tasks per executor will lead to residual data, and 
makes executor always be busy.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11205: [SPARK-11334][Core] Handle maximum task failure situatio...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/11205
  
**[Test build #83067 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83067/testReport)**
 for PR 11205 at commit 
[`59f9c15`](https://github.com/apache/spark/commit/59f9c156c3ad746f84f385bcf277685c9c329286).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147025026
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...

2017-10-25 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r147024180
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1619,11 +1619,38 @@ def to_arrow_type(dt):
 arrow_type = pa.decimal(dt.precision, dt.scale)
 elif type(dt) == StringType:
 arrow_type = pa.string()
+elif type(dt) == DateType:
+arrow_type = pa.date32()
+elif type(dt) == TimestampType:
+# Timestamps should be in UTC, JVM Arrow timestamps require a 
timezone to be read
+arrow_type = pa.timestamp('us', tz='UTC')
 else:
 raise TypeError("Unsupported type in conversion to Arrow: " + 
str(dt))
 return arrow_type
 
 
+def _check_dataframe_localize_timestamps(df):
+""" Convert timezone aware timestamps to timezone-naive in local time
+"""
+from pandas.api.types import is_datetime64tz_dtype
+for column, series in df.iteritems():
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if is_datetime64tz_dtype(series.dtype):
+df[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+return df
+
+
+def _check_series_convert_timestamps_internal(s):
+""" Convert a tz-naive timestamp in local tz to UTC normalized for 
Spark internal storage
+"""
+from pandas.api.types import is_datetime64_dtype
+# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+if is_datetime64_dtype(s.dtype):
+return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+else:
+return s
--- End diff --

Oh, got it. Yeah I think you're right.. I'll check on that a little later


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147024086
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...

2017-10-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r147023750
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1619,11 +1619,38 @@ def to_arrow_type(dt):
 arrow_type = pa.decimal(dt.precision, dt.scale)
 elif type(dt) == StringType:
 arrow_type = pa.string()
+elif type(dt) == DateType:
+arrow_type = pa.date32()
+elif type(dt) == TimestampType:
+# Timestamps should be in UTC, JVM Arrow timestamps require a 
timezone to be read
+arrow_type = pa.timestamp('us', tz='UTC')
 else:
 raise TypeError("Unsupported type in conversion to Arrow: " + 
str(dt))
 return arrow_type
 
 
+def _check_dataframe_localize_timestamps(df):
+""" Convert timezone aware timestamps to timezone-naive in local time
+"""
+from pandas.api.types import is_datetime64tz_dtype
+for column, series in df.iteritems():
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if is_datetime64tz_dtype(series.dtype):
+df[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+return df
+
+
+def _check_series_convert_timestamps_internal(s):
+""" Convert a tz-naive timestamp in local tz to UTC normalized for 
Spark internal storage
+"""
+from pandas.api.types import is_datetime64_dtype
+# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+if is_datetime64_dtype(s.dtype):
+return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+else:
+return s
--- End diff --

I meant if is_datetime64**tz**_dtype(s.dtype) but had the strange timezone 
like `tzlocal()`, I thought we need `s.dt.tz_convert('UTC')`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147022826
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147022413
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...

2017-10-25 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r147021983
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1619,11 +1619,38 @@ def to_arrow_type(dt):
 arrow_type = pa.decimal(dt.precision, dt.scale)
 elif type(dt) == StringType:
 arrow_type = pa.string()
+elif type(dt) == DateType:
+arrow_type = pa.date32()
+elif type(dt) == TimestampType:
+# Timestamps should be in UTC, JVM Arrow timestamps require a 
timezone to be read
+arrow_type = pa.timestamp('us', tz='UTC')
 else:
 raise TypeError("Unsupported type in conversion to Arrow: " + 
str(dt))
 return arrow_type
 
 
+def _check_dataframe_localize_timestamps(df):
+""" Convert timezone aware timestamps to timezone-naive in local time
+"""
+from pandas.api.types import is_datetime64tz_dtype
+for column, series in df.iteritems():
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if is_datetime64tz_dtype(series.dtype):
+df[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+return df
+
+
+def _check_series_convert_timestamps_internal(s):
+""" Convert a tz-naive timestamp in local tz to UTC normalized for 
Spark internal storage
+"""
+from pandas.api.types import is_datetime64_dtype
+# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+if is_datetime64_dtype(s.dtype):
+return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+else:
+return s
--- End diff --

This is to ensure that the values will be from unix epoch, which is what 
Spark expects to store internally.  Just like `TimestampType.toInternal` 
[here](https://github.com/BryanCutler/spark/blob/4d4089330d451bf6a145c28a6f34407ce3138b4d/python/pyspark/sql/types.py#L190)
 - only this conversion is vectorized with Pandas


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...

2017-10-25 Thread akopich
Github user akopich commented on a diff in the pull request:

https://github.com/apache/spark/pull/19565#discussion_r147021726
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer 
with Logging {
   override private[clustering] def next(): OnlineLDAOptimizer = {
 val batch = docs.sample(withReplacement = sampleWithReplacement, 
miniBatchFraction,
   randomGenerator.nextLong())
-if (batch.isEmpty()) return this
--- End diff --

I believe, it's redundant now. Anyway, `submitMiniBatch` counts the 
documents.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19471: [SPARK-22245][SQL] partitioned data set should always pu...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19471
  
**[Test build #83066 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83066/testReport)**
 for PR 19471 at commit 
[`d21ebaa`](https://github.com/apache/spark/commit/d21ebaab6d6e2d7d6d10933d72360cef49194a90).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...

2017-10-25 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19565#discussion_r147020853
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer 
with Logging {
   override private[clustering] def next(): OnlineLDAOptimizer = {
 val batch = docs.sample(withReplacement = sampleWithReplacement, 
miniBatchFraction,
   randomGenerator.nextLong())
-if (batch.isEmpty()) return this
--- End diff --

We still need this, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should fi...

2017-10-25 Thread hhbyyh
Github user hhbyyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/19565#discussion_r147021004
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -446,14 +445,14 @@ final class OnlineLDAOptimizer extends LDAOptimizer 
with Logging {
   override private[clustering] def next(): OnlineLDAOptimizer = {
 val batch = docs.sample(withReplacement = sampleWithReplacement, 
miniBatchFraction,
   randomGenerator.nextLong())
-if (batch.isEmpty()) return this
 submitMiniBatch(batch)
   }
 
   /**
* Submit a subset (like 1%, decide by the miniBatchFraction) of the 
corpus to the Online LDA
* model, and it will update the topic distribution adaptively for the 
terms appearing in the
* subset.
+   * The methods assumes no empty documents are submitted.
--- End diff --

Maybe add a require


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147021138
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,
 

[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...

2017-10-25 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r147021068
  
--- Diff: python/pyspark/serializers.py ---
@@ -224,7 +225,13 @@ def _create_batch(series):
 # If a nullable integer series has been promoted to floating point 
with NaNs, need to cast
 # NOTE: this is not necessary with Arrow >= 0.7
 def cast_series(s, t):
-if t is None or s.dtype == t.to_pandas_dtype():
+if type(t) == pa.TimestampType:
+# NOTE: convert to 'us' with astype here, unit ignored in 
`from_pandas` see ARROW-1680
+return 
_series_convert_timestamps_internal(s).values.astype('datetime64[us]')
--- End diff --

`apply()` will invoke the given function on each individual value of the 
series.  I think this iterates over the series, where `s.dt.tz_localize()` 
would do a vectorized operation and should be faster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19468
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83062/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19468
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19468
  
**[Test build #83062 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83062/testReport)**
 for PR 19468 at commit 
[`a4f9797`](https://github.com/apache/spark/commit/a4f97976afff452d7d953b83b722da61dfb40c3b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19565: [SPARK-22111][MLLIB] OnlineLDAOptimizer should filter ou...

2017-10-25 Thread hhbyyh
Github user hhbyyh commented on the issue:

https://github.com/apache/spark/pull/19565
  
I wonder if we should add cache() for lda training data, even not for this 
feature. 

@srowen Not sure where we're on caching the training data or not for 
different algorithms. Appreciate your advice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...

2017-10-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r147020254
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1619,11 +1619,38 @@ def to_arrow_type(dt):
 arrow_type = pa.decimal(dt.precision, dt.scale)
 elif type(dt) == StringType:
 arrow_type = pa.string()
+elif type(dt) == DateType:
+arrow_type = pa.date32()
+elif type(dt) == TimestampType:
+# Timestamps should be in UTC, JVM Arrow timestamps require a 
timezone to be read
+arrow_type = pa.timestamp('us', tz='UTC')
 else:
 raise TypeError("Unsupported type in conversion to Arrow: " + 
str(dt))
 return arrow_type
 
 
+def _check_dataframe_localize_timestamps(df):
+""" Convert timezone aware timestamps to timezone-naive in local time
+"""
+from pandas.api.types import is_datetime64tz_dtype
+for column, series in df.iteritems():
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if is_datetime64tz_dtype(series.dtype):
+df[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+return df
+
+
+def _check_series_convert_timestamps_internal(s):
+""" Convert a tz-naive timestamp in local tz to UTC normalized for 
Spark internal storage
+"""
+from pandas.api.types import is_datetime64_dtype
+# TODO: handle nested timestamps, such as ArrayType(TimestampType())?
+if is_datetime64_dtype(s.dtype):
+return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+else:
+return s
--- End diff --

We need `s.dt.tz_convert('UTC')` for the case the timezone is strange like 
`tzlocal()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147019793
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestam...

2017-10-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r147019262
  
--- Diff: python/pyspark/serializers.py ---
@@ -224,7 +225,13 @@ def _create_batch(series):
 # If a nullable integer series has been promoted to floating point 
with NaNs, need to cast
 # NOTE: this is not necessary with Arrow >= 0.7
 def cast_series(s, t):
-if t is None or s.dtype == t.to_pandas_dtype():
+if type(t) == pa.TimestampType:
+# NOTE: convert to 'us' with astype here, unit ignored in 
`from_pandas` see ARROW-1680
+return 
_series_convert_timestamps_internal(s).values.astype('datetime64[us]')
--- End diff --

@BryanCutler It seems `s.apply(lambda ts: ts.tz_localize('tzlocal()'))` 
works without `s.fillna(0)`. Do you know the difference between this and 
`s.dt.tz_localize('tzlocal()')`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147019065
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+  

[GitHub] spark issue #19576: [SPARK-19727][SQL][followup] Fix for round function that...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19576
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19576: [SPARK-19727][SQL][followup] Fix for round function that...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19576
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83059/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19576: [SPARK-19727][SQL][followup] Fix for round function that...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19576
  
**[Test build #83059 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83059/testReport)**
 for PR 19576 at commit 
[`cba65d1`](https://github.com/apache/spark/commit/cba65d1c72427c9e36eb097dcdf9c2420f66d5db).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147018411
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15770
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2017-10-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15770
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83064/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15770: [SPARK-15784][ML]:Add Power Iteration Clustering to spar...

2017-10-25 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15770
  
**[Test build #83064 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83064/testReport)**
 for PR 15770 at commit 
[`752b685`](https://github.com/apache/spark/commit/752b685892c1dbdf69811504985640e59756f679).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147017517
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.util.Utils
+
+/**
+ * Configures executor pods. Construct one of these with a SparkConf to 
set up properties that are
+ * common across all executors. Then, pass in dynamic parameters into 
createExecutorPod.
+ */
+private[spark] trait ExecutorPodFactory {
+  def createExecutorPod(
+  executorId: String,
+  applicationId: String,
+  driverUrl: String,
+  executorEnvs: Seq[(String, String)],
+  driverPod: Pod,
+  nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+  extends ExecutorPodFactory {
+
+  import ExecutorPodFactoryImpl._
+
+  private val executorExtraClasspath = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+  private val executorJarsDownloadDir = 
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
+
+  private val executorLabels = 
ConfigurationUtils.parsePrefixedKeyValuePairs(
+sparkConf,
+KUBERNETES_EXECUTOR_LABEL_PREFIX,
+"executor label")
+  require(
+!executorLabels.contains(SPARK_APP_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is 
reserved for Spark.")
+  require(
+!executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it 
is reserved for" +
+  s" Spark.")
+
+  private val executorAnnotations =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
+  "executor annotation")
+  private val nodeSelector =
+ConfigurationUtils.parsePrefixedKeyValuePairs(
+  sparkConf,
+  KUBERNETES_NODE_SELECTOR_PREFIX,
+  "node selector")
+
+  private val executorDockerImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE)
+  private val dockerImagePullPolicy = 
sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val executorPort = sparkConf.getInt("spark.executor.port", 
DEFAULT_STATIC_PORT)
+  private val blockmanagerPort = sparkConf
+.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+  private val kubernetesDriverPodName = sparkConf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val executorPodNamePrefix = 
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+  private val executorMemoryMiB = 
sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+  private val executorMemoryString = sparkConf.get(
+org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+  private val memoryOverheadMiB = sparkConf
+.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val executorMemoryWithOverhead = executorMemoryMiB + 
memoryOverheadMiB
+
+  private val executorCores = sparkConf.getDouble("spark.executor.cores", 
1d)
+  private val executorLimitCores = 
sparkConf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
+
+  override def createExecutorPod(
+  executorId: String,

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r147017011
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+conf.get("spark.driver.host"),
+conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+  

  1   2   3   4   5   >