[GitHub] spark pull request #18477: [SPARK-21261][DOCS]SQL Regex document fix

2017-10-23 Thread visaxin
Github user visaxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18477#discussion_r146457833
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala
 ---
@@ -268,7 +268,7 @@ case class StringSplit(str: Expression, pattern: 
Expression)
   usage = "_FUNC_(str, regexp, rep) - Replaces all substrings of `str` 
that match `regexp` with `rep`.",
   extended = """
 Examples:
-  > SELECT _FUNC_('100-200', '(\d+)', 'num');
+  > SELECT _FUNC_('100-200', '(\\d+)', 'num');
--- End diff --

I add spark-sql and scala to make it clear.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18477: [SPARK-21261][DOCS]SQL Regex document fix

2017-10-23 Thread visaxin
Github user visaxin commented on the issue:

https://github.com/apache/spark/pull/18477
  
@gatorsmile Done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18527: [SPARK-21101][SQL] Catch IllegalStateException when CREA...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18527
  
**[Test build #83007 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83007/testReport)**
 for PR 18527 at commit 
[`47071b5`](https://github.com/apache/spark/commit/47071b5ec2d1e285221918ad075878500aa7d7bb).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19557
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19557
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83005/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19557
  
**[Test build #83005 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83005/testReport)**
 for PR 19557 at commit 
[`442711d`](https://github.com/apache/spark/commit/442711dd3f4fb5ce6f6d622cccd0298395aca88b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19557
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19557
  
**[Test build #83004 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83004/testReport)**
 for PR 19557 at commit 
[`a9b74de`](https://github.com/apache/spark/commit/a9b74de3af21c823de28431671512b143273f468).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19557
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83004/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...

2017-10-23 Thread nkronenfeld
Github user nkronenfeld commented on the issue:

https://github.com/apache/spark/pull/19529
  
Yeah, as predicted, that made PlanTest very easy to review, but didn't do 
as well with SQLTestUtils.  I suspect I reordered functions and what-not when I 
was moving stuff around.

If this is still too confusing to deal with, just let me know.  Even if I 
can't make the end diffs of the entire PR non-trivial, I could certainly 
re-implement this in a way that the individual commits would each be trivial; 
then it'd just be a question of following along commit-by-commit, and shouldn't 
be too bad.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19529
  
**[Test build #83006 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83006/testReport)**
 for PR 19529 at commit 
[`2d927e9`](https://github.com/apache/spark/commit/2d927e94f627919ac1546b47072276b23d3e8da2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19557
  
**[Test build #83005 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83005/testReport)**
 for PR 19557 at commit 
[`442711d`](https://github.com/apache/spark/commit/442711dd3f4fb5ce6f6d622cccd0298395aca88b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19557
  
**[Test build #83004 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83004/testReport)**
 for PR 19557 at commit 
[`a9b74de`](https://github.com/apache/spark/commit/a9b74de3af21c823de28431671512b143273f468).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19560: [SPARK-22334][SQL] Check table size from filesyst...

2017-10-23 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19560#discussion_r146449741
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -120,22 +120,41 @@ class DetermineTableStats(session: SparkSession) 
extends Rule[LogicalPlan] {
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
   val table = relation.tableMeta
   val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
+getSizeFromHdfs(table.location)
   } else {
 session.sessionState.conf.defaultSizeInBytes
   }
 
   val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
   relation.copy(tableMeta = withStats)
+
+case relation: HiveTableRelation
+if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.nonEmpty &&
+  
session.sessionState.conf.verifyStatsFromFileSystemWhenBroadcastJoin &&
+  relation.tableMeta.stats.get.sizeInBytes <
+session.sessionState.conf.autoBroadcastJoinThreshold =>
+  val table = relation.tableMeta
+  val sizeInBytes = getSizeFromHdfs(table.location)
--- End diff --

Yes, I think it's good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19557: [SPARK-22281][SPARKR][WIP] Handle R method breaking sign...

2017-10-23 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/19557
  
unfortunately `attach` part didn't work on r-devel.
I think you are right, I'm going try to apply the similar approach for 
`attach` - i started that way before I found out method signature was not the 
cause for `glm`. So now I'm going backtrack to re-do `attach` too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19560: [SPARK-22334][SQL] Check table size from filesyst...

2017-10-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19560#discussion_r146448976
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -120,22 +120,41 @@ class DetermineTableStats(session: SparkSession) 
extends Rule[LogicalPlan] {
 if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
   val table = relation.tableMeta
   val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-try {
-  val hadoopConf = session.sessionState.newHadoopConf()
-  val tablePath = new Path(table.location)
-  val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
-  fs.getContentSummary(tablePath).getLength
-} catch {
-  case e: IOException =>
-logWarning("Failed to get table size from hdfs.", e)
-session.sessionState.conf.defaultSizeInBytes
-}
+getSizeFromHdfs(table.location)
   } else {
 session.sessionState.conf.defaultSizeInBytes
   }
 
   val withStats = table.copy(stats = 
Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes
   relation.copy(tableMeta = withStats)
+
+case relation: HiveTableRelation
+if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.nonEmpty &&
+  
session.sessionState.conf.verifyStatsFromFileSystemWhenBroadcastJoin &&
+  relation.tableMeta.stats.get.sizeInBytes <
+session.sessionState.conf.autoBroadcastJoinThreshold =>
+  val table = relation.tableMeta
+  val sizeInBytes = getSizeFromHdfs(table.location)
--- End diff --

If the metadata statistics are wrong, getting the size from files every 
time seems a burden. Can we show some message to users and suggest them to 
update table statistics?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18747: [SPARK-20822][SQL] Generate code to directly get value f...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18747
  
**[Test build #83003 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83003/testReport)**
 for PR 18747 at commit 
[`db61b41`](https://github.com/apache/spark/commit/db61b41a61da5d484742ee8f0dfa53e1486b0456).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19560: [SPARK-22334][SQL] Check table size from filesyst...

2017-10-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19560#discussion_r146448519
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -187,6 +187,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val VERIFY_STATS_FROM_FILESYSTEM_WHEN_BROADCASTJOIN =
--- End diff --

This config name implies it only does verification when broadcast join. 
However, seems that it verifies the statistics no matter doing broadcast join 
or not. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17589: [SPARK-16544][SQL] Support for conversion from nu...

2017-10-23 Thread HyukjinKwon
Github user HyukjinKwon closed the pull request at:

https://github.com/apache/spark/pull/17589


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17589: [SPARK-16544][SQL] Support for conversion from numeric c...

2017-10-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/17589
  
Closing this. Will take another look and make a cleaner fix next time, or 
reopen if I see some more interests in this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19535: [SPARK-22313][PYTHON] Mark/print deprecation warn...

2017-10-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19535


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19535: [SPARK-22313][PYTHON] Mark/print deprecation warnings as...

2017-10-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19535
  
Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19535: [SPARK-22313][PYTHON] Mark/print deprecation warnings as...

2017-10-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19535
  
Thanks @srowen, @rxin and @felixcheung.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19556
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82999/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19556
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19556
  
**[Test build #82999 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82999/testReport)**
 for PR 19556 at commit 
[`5d7efd1`](https://github.com/apache/spark/commit/5d7efd14c0baba3e3f41258fcf6dc44f2976450a).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19459
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19459
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83001/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19560: [SPARK-22334][SQL] Check table size from filesystem in c...

2017-10-23 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/19560
  
@gatorsmile @dongjoon-hyun 

Thanks a lot for looking into this.
This pr aims to avoid OOM if metastore fails to update table properties 
after the data is already produced. With the config in this pr enabled, we 
check the size on filesystem only when `totalSize` is below 
`spark.sql.autoBroadcastJoinThreshold`, so I think the cost can be acceptable.

Yes, the storage can be other filesystems. I refined the name. Please take 
a look again when you have time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19459
  
**[Test build #83001 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83001/testReport)**
 for PR 19459 at commit 
[`f421e2d`](https://github.com/apache/spark/commit/f421e2da1e97dfbc7c80b7ae724b6ea9a472b220).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #10466: [SPARK-12375] [ML] add handleinvalid for vectorindexer

2017-10-23 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/10466
  
@hhbyyh Do you get time to continue this PR ? thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19560: [SPARK-22334][SQL] Check table size from HDFS in case th...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19560
  
**[Test build #83002 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83002/testReport)**
 for PR 19560 at commit 
[`bf59c27`](https://github.com/apache/spark/commit/bf59c27d0a8a01dc0572cf148f40b6337799f241).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146439253
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146439099
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark issue #19559: [SPARK-22333][SQL]ColumnReference should get higher prio...

2017-10-23 Thread DonnyZone
Github user DonnyZone commented on the issue:

https://github.com/apache/spark/pull/19559
  
@hvanhovell Yes! I made something wrong. The `timeFunctionCall` has 
conflicts with `columnReference`. This fix will break every use of 
CURRENT_DATE/CURRENT_TIMESTAMP.

For [SPARK-16836](https://github.com/apache/spark/pull/14442), 
I think this feature should be implemented in analysis phase rather than in 
parser phase. When there is no such columns, they can be transformed as 
functions. Another approach is to define a configuration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146437952
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146437447
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146435148
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+// Update the driver's delegation tokens in case new executors are 
added later.
+currentHadoopDelegationTokens = Some(tokens)
+executorDataMap.values.foreach { ed =>
+  ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) }
--- End diff --

`}` goes in next line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436646
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 ---
@@ -59,8 +59,7 @@ private[yarn] class AMCredentialRenewer(
   private var lastCredentialsFileSuffix = 0
 
   private val credentialRenewer =
-Executors.newSingleThreadScheduledExecutor(
-  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh 
Thread")
--- End diff --

Normally you should avoid making changes that are not related to your PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146434999
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala
 ---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.SparkContext
--- End diff --

Change not needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436571
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+driverEndpoint: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, unset 
$keytab to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436883
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+driverEndpoint: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, unset 
$keytab to use TGT")
--- End diff --

`${KEYTAB.key}`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146437027
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+/**
+ * The MesosCredentialRenewer will update the Hadoop credentials for Spark 
drivers accessing
+ * secured services using Kerberos authentication. It is modeled after the 
YARN AMCredential
+ * renewer, and similarly will renew the Credentials when 75% of the 
renewal interval has passed.
+ * The principal difference is that instead of writing the new credentials 
to HDFS and
+ * incrementing the timestamp of the file, the new credentials (called 
Tokens when they are
+ * serialized) are broadcast to all running executors. On the executor 
side, when new Tokens are
+ * recieved they overwrite the current credentials.
+ */
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+driverEndpoint: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get(config.PRINCIPAL).orNull
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab = conf.get(config.KEYTAB).orNull
+val tgt = conf.getenv("KRB5CCNAME")
+require(keytab != null || tgt != null, "A keytab or TGT required.")
+// if both Keytab and TGT are detected we use the Keytab.
+val (secretFile, mode) = if (keytab != null && tgt != null) {
+  logWarning(s"Keytab and TGT were detected, using keytab, unset 
$keytab to use TGT")
+  (keytab, "keytab")
+} else {
+  val m = if (keytab != null) "keytab" else "tgt"
+  val sf = if (keytab != null) keytab else tgt
+  (sf, m)
+}
+logInfo(s"Logging in as $principal with mode $mode to retrieve Hadoop 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  

[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146436327
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -213,6 +216,24 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && currentHadoopDelegationTokens.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  // The renewal time is ignored when getting the initial delegation 
tokens
+  // (CoarseGrainedSchedulerBackend.scala:getHadoopDelegationCreds), 
so we get the renewal
+  // time here and schedule a thread to renew them.
+  val renewalTime =
--- End diff --

I still don't like this. You should not need to implement this separate 
method of getting the renewal time just because the current code is throwing 
out that information. Instead you should fix the code so that the information 
is preserved.

`getHadoopDelegationCreds` is called in only one place, so my suggestion 
would be to encapsulate initializing the token manager and getting the initial 
set of tokens into a single method (instead of the current two).

Then in that method's implementation you can get the initial set of tokens, 
initialize the renewer thread with the correct renewal period, and return the 
data needed by the scheduler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r146436616
  
--- Diff: python/pyspark/sql/session.py ---
@@ -414,6 +415,52 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _createFromPandasWithArrow(self, pdf, schema):
+"""
+Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
+to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
+data types will be used to coerce the data in Pandas to Arrow 
conversion.
+"""
+from pyspark.serializers import ArrowSerializer, _create_batch
+from pyspark.sql.types import from_arrow_schema, to_arrow_type
+import pyarrow as pa
+
+# Slice the DataFrame into batches
+step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
+pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
+
+if schema is None or isinstance(schema, list):
+batches = [pa.RecordBatch.from_pandas(pdf_slice, 
preserve_index=False)
+   for pdf_slice in pdf_slices]
+
+# There will be at least 1 batch after slicing the 
pandas.DataFrame
+schema_from_arrow = from_arrow_schema(batches[0].schema)
+
+# If passed schema as a list of names then rename fields
+if isinstance(schema, list):
+fields = []
+for i, field in enumerate(schema_from_arrow):
+field.name = schema[i]
+fields.append(field)
+schema = StructType(fields)
+else:
+schema = schema_from_arrow
+else:
+if not isinstance(schema, StructType) and isinstance(schema, 
DataType):
+schema = StructType().add("value", schema)
--- End diff --

BTW, I think we should not support this case:

```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show()
```

```
+-+
|value|
+-+
|1|
+-+
```

```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "false")
>>> spark.createDataFrame(pd.DataFrame([{"a": 1}]), "int").show()
```

```
Traceback (most recent call last):
  File "", line 1, in 
  File "/.../spark/python/pyspark/sql/session.py", line 595, in 
createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 399, in 
_createFromLocal
data = list(data)
  File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare
verify_func(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1337, in 
verify_integer
verify_acceptable_types(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1300, in 
verify_acceptable_types
% (dataType, obj, type(obj
TypeError: field value: IntegerType can not accept object (1,) in type 

```

I thought disallowing it is actually more consistent with normal Python 
lists:


```python
>>> spark.createDataFrame([1], "int").show()
```

```
+-+
|value|
+-+
|1|
+-+
```

```python
>>> spark.createDataFrame([[1]], "int").show()
```

```
Traceback (most recent call last):
  File "", line 1, in 
  File "/U.../spark/python/pyspark/sql/session.py", line 595, in 
createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 399, in 
_createFromLocal
data = list(data)
  File "/.../spark/python/pyspark/sql/session.py", line 585, in prepare
verify_func(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
verify_value(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1337, in 
verify_integer
verify_acceptable_types(obj)
  File "/.../spark/python/pyspark/sql/types.py", line 1300, in 
verify_acceptable_types
% (dataType, obj, type(obj
TypeError: field value: IntegerType can not accept object [1] in type 
```

If we need to support this, I think it should print as below:

```python
>>> spark.createDataFrame([[1]], "string").show()
```

```
+-+
|value|
+-+
|  [1]|
+-+
```

  

[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19527
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83000/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19527
  
**[Test build #83000 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83000/testReport)**
 for PR 19527 at commit 
[`adc4107`](https://github.com/apache/spark/commit/adc410770528c6c95a3c35de64548362c1b46643).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19527
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19529: [SPARK-22308] Support alternative unit testing styles in...

2017-10-23 Thread nkronenfeld
Github user nkronenfeld commented on the issue:

https://github.com/apache/spark/pull/19529
  
@gatorsmile sounds good, giving that a try now...  assuming tests pass, 
I'll check it in and see if it's any better.

I've so far done this for PlanTest and SQLTestUtils
PlanTest I suspect it will make much cleaner.
In SQLTestUtils I suspect it won't help as much, as it was more a 
pick-and-choose (this function goes in base, this doesn't)

I haven't done it at all for SharedSQLContext/SharedSparkSession... that 
one seems more deserving of a first-level place to me, so I'm more hesitant to, 
but if you want, I'll do that one too.

I suspect the correct answer is going to be redoing the PR, with careful 
commits that are clearer about what each does... but I'll try this first anyway 
:-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r146434552
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -194,6 +198,27 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
   sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
 )
 
+// check that the credentials are defined, even though it's likely 
that auth would have failed
+// already if you've made it this far
+if (principal != null && hadoopDelegationCreds.isDefined) {
+  logDebug(s"Principal found ($principal) starting token renewer")
+  val credentialRenewerThread = new Thread {
+setName("MesosCredentialRenewer")
+override def run(): Unit = {
+  val rt = 
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
+  val credentialRenewer =
+new MesosCredentialRenewer(
+  conf,
+  hadoopDelegationTokenManager.get,
+  MesosCredentialRenewer.getNextRenewalTime(rt),
+  driverEndpoint)
+  credentialRenewer.scheduleTokenRenewal()
+}
+  }
+
+  credentialRenewerThread.start()
+  credentialRenewerThread.join()
--- End diff --

That's the gist of it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146434438
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...

2017-10-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19562
  
Thank you!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

2017-10-23 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/19459
  
Merged in PR from @ueshin and added case for when schema is a string single 
datatype.  In addition using a `StructType`, now this handles specifying the 
schema with the following:

```
spark.createDataFrame(pdf, ['name', 'age'])
spark.createDataFrame(pdf, "a: string, b: int")
spark.createDataFrame(pdf, "int")
spark.createDataFrame(pdf, "struct")
```

@viirya brought up a good point here 
https://github.com/apache/spark/pull/19459#discussion_r145862488  (linking 
because it's outdated and hidden) - which shows another good reason to upgrade 
Arrow, I think


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-23 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r146431078
  
--- Diff: python/pyspark/sql/session.py ---
@@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _createFromPandasWithArrow(self, pdf, schema):
+"""
+Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
+to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
+data types will be used to coerce the data in Pandas to Arrow 
conversion.
+"""
+from pyspark.serializers import ArrowSerializer
+from pyspark.sql.types import from_arrow_schema, to_arrow_type, 
_cast_pandas_series_type
+import pyarrow as pa
+
+# Slice the DataFrame into batches
+step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
+pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
+
+if schema is None or isinstance(schema, list):
+batches = [pa.RecordBatch.from_pandas(pdf_slice, 
preserve_index=False)
+   for pdf_slice in pdf_slices]
+
+# There will be at least 1 batch after slicing the 
pandas.DataFrame
+schema_from_arrow = from_arrow_schema(batches[0].schema)
+
+# If passed schema as a list of names then rename fields
+if isinstance(schema, list):
+fields = []
+for i, field in enumerate(schema_from_arrow):
+field.name = schema[i]
+fields.append(field)
+schema = StructType(fields)
+else:
+schema = schema_from_arrow
+else:
+batches = []
+for i, pdf_slice in enumerate(pdf_slices):
+
+# convert to series to pyarrow.Arrays to use mask when 
creating Arrow batches
+arrs = []
+names = []
+for c, (_, series) in enumerate(pdf_slice.iteritems()):
+field = schema[c]
+names.append(field.name)
+t = to_arrow_type(field.dataType)
+try:
+# NOTE: casting is not necessary with Arrow >= 0.7
+
arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
+ 
mask=series.isnull(), type=t))
+except ValueError as e:
--- End diff --

yeah, there doesn't seem to be a way to guard against overflow with `astype`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFra...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19459
  
**[Test build #83001 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83001/testReport)**
 for PR 19459 at commit 
[`f421e2d`](https://github.com/apache/spark/commit/f421e2da1e97dfbc7c80b7ae724b6ea9a472b220).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146430483
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18125
  
**[Test build #82998 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82998/testReport)**
 for PR 18125 at commit 
[`a33ea0d`](https://github.com/apache/spark/commit/a33ea0d7601d7b14e50536e3c457847145e799ae).
 * This patch **fails MiMa tests**.
 * This patch **does not merge cleanly**.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18125
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82998/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18125
  
Build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146429823
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark issue #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator for OneH...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19527
  
**[Test build #83000 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83000/testReport)**
 for PR 19527 at commit 
[`adc4107`](https://github.com/apache/spark/commit/adc410770528c6c95a3c35de64548362c1b46643).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-23 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19556
  
cc @cloud-fan for review too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146429113
  
--- Diff: pom.xml ---
@@ -2649,6 +2649,13 @@
 
 
 
+  kubernetes
+  
+resource-managers/kubernetes/core
--- End diff --

That (keeping them separate) is actually pretty useful for SBT.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146428556
  
--- Diff: pom.xml ---
@@ -2649,6 +2649,13 @@
 
 
 
+  kubernetes
+  
+resource-managers/kubernetes/core
--- End diff --

It's not absolutely necessary to have integration tests in a specific 
separate module. However, there are some nice organizational benefits we can 
get. For example, integration tests in the same module as the core code will 
need a specific package namespace that is omitted from the `test` phase and 
only executed in the `integrationTest` phase. Having a separate module means 
that the integration test pom can just make the `test` phase a no-op and 
integrationTest runs all tests in the `test` folder. (I don't know if Maven has 
a concept of a difference between `src/test/scala` and 
`src/integrationTest/scala`, which would help a lot.)

It's also IMO easier to read the `pom.xml` of the integration test 
separately from the `pom.xml` of the Kubernetes core implementation. FWIW this 
is what we have in the integration test POM at the moment: 
https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/integration-tests/pom.xml.
 (The minikube related things are going away with 
https://github.com/apache-spark-on-k8s/spark/pull/521).

> And that's assuming that you really don't want to run them during unit 
tests.

We definitely don't want to run these during unit tests - they are 
relatively expensive, require building Docker images, and require Minikube to 
be pre-installed on the given machine. Having them in at least the separate 
integration test phase makes these differences clear.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator f...

2017-10-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19527#discussion_r146428519
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala 
---
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, 
HasOutputCols}
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.functions.{col, lit, udf}
+import org.apache.spark.sql.types.{DoubleType, NumericType, StructField, 
StructType}
+
+/** Private trait for params and common methods for OneHotEncoderEstimator 
and OneHotEncoderModel */
+private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid
+with HasInputCols with HasOutputCols {
+
+  /**
+   * Param for how to handle invalid data.
+   * Options are 'keep' (invalid data are ignored) or 'error' (throw an 
error).
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.3.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"How to handle invalid data " +
+"Options are 'keep' (invalid data are ignored) or error (throw an 
error).",
--- End diff --

I will change the wording. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19556: [SPARK-22328][Core] ClosureCleaner should not miss refer...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19556
  
**[Test build #82999 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82999/testReport)**
 for PR 19556 at commit 
[`5d7efd1`](https://github.com/apache/spark/commit/5d7efd14c0baba3e3f41258fcf6dc44f2976450a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table s...

2017-10-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19562


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146427690
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18125
  
**[Test build #82998 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82998/testReport)**
 for PR 18125 at commit 
[`a33ea0d`](https://github.com/apache/spark/commit/a33ea0d7601d7b14e50536e3c457847145e799ae).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19562
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18125
  
@setjet Could you address the conflicts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18125: [SPARK-20891][SQL] Reduce duplicate code typedaggregator...

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18125
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146426881
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146426810
  
--- Diff: pom.xml ---
@@ -2649,6 +2649,13 @@
 
 
 
+  kubernetes
+  
+resource-managers/kubernetes/core
--- End diff --

Why are integration tests in a separate module? e.g. maven has an 
`integration-test` phase which is separate from the usual `test` phase used for 
unit tests. And that's assuming that you really don't want to run them during 
unit tests. Then all code could potentially live in the same module.

> in the interest of keeping test code together

That would mean keeping the test code in the same module as the core code, 
not in a separate module.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp suppo...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18664
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp suppo...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18664
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82997/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL] Add Date and Timestamp suppo...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18664
  
**[Test build #82997 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82997/testReport)**
 for PR 18664 at commit 
[`79bb93f`](https://github.com/apache/spark/commit/79bb93f36ad6a0096f59072c54097015e2099a73).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18477: [SPARK-21261][DOCS]SQL Regex document fix

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18477
  
@visaxin Could you address the comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146426271
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146426317
  
--- Diff: pom.xml ---
@@ -2649,6 +2649,13 @@
 
 
 
+  kubernetes
+  
+resource-managers/kubernetes/core
--- End diff --

@vanzin this isn't a multi-module project in the sense that the Kubernetes 
cluster manager and spark-submit implementation are split across multiple 
projects - but rather that there is a module for said cluster manager + 
spark-submit implementation, and then there are modules for integration testing 
said code.

@foxish The Dockerfiles feel more like application code rather than static 
configuration but that might just be a matter of implementation. The structure 
of the `CMD` in the Dockerfiles is particular to what `spark-submit` will 
expect for example.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18527: [SPARK-21101][SQL] Catch IllegalStateException when CREA...

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18527
  
ping @wangyum This sounds a reasonable fix. Could you resolve the conflicts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread foxish
Github user foxish commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146426033
  
--- Diff: pom.xml ---
@@ -2649,6 +2649,13 @@
 
 
 
+  kubernetes
+  
+resource-managers/kubernetes/core
--- End diff --

Based on a discussion in last week's meeting with Shane Knapp from RISELab, 
we want to keep the integration tests as a sub-module here - in the interest of 
keeping test code together. We should have the additional parent pom to 
facilitate that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19556: [SPARK-22328][Core] ClosureCleaner should not mis...

2017-10-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19556#discussion_r146426012
  
--- Diff: core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---
@@ -91,6 +91,52 @@ private[spark] object ClosureCleaner extends Logging {
 (seen - obj.getClass).toList
   }
 
+  /** Initializes the accessed fields for outer classes and their super 
classes. */
+  private def initAccessedFields(
+  accessedFields: Map[Class[_], Set[String]],
+  outerClasses: Seq[Class[_]]): Unit = {
+for (cls <- outerClasses) {
+  accessedFields(cls) = Set.empty[String]
+
+  var superClass = cls.getSuperclass()
+  while (superClass != null) {
+accessedFields(superClass) = Set.empty[String]
+superClass = superClass.getSuperclass()
+  }
+}
+  }
+
+  /** Sets accessed fields for given class in clone object based on given 
object. */
+  private def setAccessedFields(
+  outerClass: Class[_],
+  clone: AnyRef,
+  obj: AnyRef,
+  accessedFields: Map[Class[_], Set[String]]): Unit = {
+for (fieldName <- accessedFields(outerClass)) {
+  val field = outerClass.getDeclaredField(fieldName)
+  field.setAccessible(true)
+  val value = field.get(obj)
+  field.set(clone, value)
+}
+  }
+
+  /** Clones a given object and sets accessed fields in cloned object. */
+  private def cloneAndSetFields(
+  parent: AnyRef,
+  obj: AnyRef,
+  outerClass: Class[_],
+  accessedFields: Map[Class[_], Set[String]]): AnyRef = {
+val clone = instantiateClass(outerClass, parent)
+setAccessedFields(outerClass, clone, obj, accessedFields)
+
+var superClass = outerClass.getSuperclass()
+while (superClass != null) {
--- End diff --

Thanks. Looks good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146425629
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146425503
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146425133
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146425100
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import io.fabric8.kubernetes.client.{Config, ConfigBuilder, 
DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.utils.HttpClientUtils
+import okhttp3.Dispatcher
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus 
common suffixes to
+ * parse configuration keys, similar to the manner in which Spark's 
SecurityManager parses SSL
+ * options for different components.
+ */
+private[spark] object SparkKubernetesClientFactory {
+
+  def createKubernetesClient(
+  master: String,
+  namespace: Option[String],
+  kubernetesAuthConfPrefix: String,
+  sparkConf: SparkConf,
+  maybeServiceAccountToken: Option[File],
+  maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
+val oauthTokenFileConf = 
s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
+val oauthTokenConf = 
s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
+val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
--- End diff --

This lacks context from the `spark-submit` implementation that is not in 
this PR.

We intend to have two different sets of authentication options for the 
Kubernetes API. The first is the credentials for creating a driver pod and all 
the Kubernetes resources that the application requires outside of executor 
pods. The second is a set of credentials that the driver can use to create 
executor pods. These options will have shared suffixes in the configuration 
keys but different prefixes.

The reasoning for two sets of credentials is twofold:

- The driver needs strictly fewer privileges than `spark-submit`, because 
the driver only creates + deletes pods but `spark-submit` needs to make pods 
and other Kubernetes resources. Two sets of credentials allows the driver to 
have an appropriately limited scope of API access.
- Part of the credentials includes TLS certificates for accessing the 
Kubernetes API over HTTPs. A common environment is to have the Kubernetes API 
server be accessible from a proxy into the cluster from an outside location, 
but then the driver will access the API server from inside the cluster. A front 
door for the API server typically asks for a different certificate than the 
certificate one would present when accessing the API server internally.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...

2017-10-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19562
  
Thank you for review and approval, @gatorsmile !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17100: [SPARK-13947][SQL] The error message from using an inval...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17100
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82994/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17100: [SPARK-13947][SQL] The error message from using an inval...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17100
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17100: [SPARK-13947][SQL] The error message from using an inval...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17100
  
**[Test build #82994 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82994/testReport)**
 for PR 17100 at commit 
[`34753b5`](https://github.com/apache/spark/commit/34753b513323f5076edd4c5006983a9c9d3d97d7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18268: [SPARK-21054] [SQL] Reset Command support reset specific...

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18268
  
ping @ericsahit 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18833: [SPARK-21625][SQL] sqrt(negative number) should be null.

2017-10-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/18833
  
Can we document this difference in 
https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146423831
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146423757
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-10-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r146423117
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 ---
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, 
AtomicReference}
+
+import scala.collection.{concurrent, mutable}
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.config._
+import org.apache.spark.deploy.k8s.constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+scheduler: TaskSchedulerImpl,
+rpcEnv: RpcEnv,
+executorPodFactory: ExecutorPodFactory,
+kubernetesClient: KubernetesClient,
+allocatorExecutor: ScheduledExecutorService,
+requestExecutorsService: ExecutorService)
+  extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+  import KubernetesClusterSchedulerBackend._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+  private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+  // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+  // Indexed by executor pod names and guarded by 
RUNNING_EXECUTOR_PODS_LOCK.
+  private val runningPodsToExecutors = new mutable.HashMap[String, String]
+  private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+  private val podsWithKnownExitReasons = new ConcurrentHashMap[String, 
ExecutorExited]()
+  private val disconnectedPodsByExecutorIdPendingRemoval = new 
ConcurrentHashMap[String, Pod]()
+
+  private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(
+  throw new SparkException("Must specify the driver pod name"))
+  private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
+  requestExecutorsService)
+
+  private val driverPod = try {
+kubernetesClient.pods()
+  .inNamespace(kubernetesNamespace)
+  .withName(kubernetesDriverPodName)
+  .get()
+  } catch {
+case throwable: Throwable =>
+  logError(s"Executor cannot find driver pod.", throwable)
+  throw new SparkException(s"Executor cannot find driver pod", 
throwable)
+  }
+
+  override val minRegisteredRatio =
+if 
(conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+  0.8
+} else {
+  super.minRegisteredRatio
+}
+
+  private val executorWatchResource = new AtomicReference[Closeable]
+  protected var totalExpectedExecutors = new AtomicInteger(0)
+
+  private val driverUrl = RpcEndpointAddress(
+  conf.get("spark.driver.host"),
+  conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+  private val initialExecutors = getInitialTargetExecutorNumber()
+
+  private val podAllocationInterval = 

[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19562
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...

2017-10-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19562
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82996/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19562: [SPARK-21912][SQL][FOLLOW-UP] ORC/Parquet table should n...

2017-10-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19562
  
**[Test build #82996 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82996/testReport)**
 for PR 19562 at commit 
[`430af34`](https://github.com/apache/spark/commit/430af34d6e6b27f4dd90fa758b9d54b2e8d7eb1f).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19459: [SPARK-20791][PYSPARK] Use Arrow to create Spark ...

2017-10-23 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19459#discussion_r146421804
  
--- Diff: python/pyspark/sql/session.py ---
@@ -414,6 +415,73 @@ def _createFromLocal(self, data, schema):
 data = [schema.toInternal(row) for row in data]
 return self._sc.parallelize(data), schema
 
+def _createFromPandasWithArrow(self, pdf, schema):
+"""
+Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
+to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
+data types will be used to coerce the data in Pandas to Arrow 
conversion.
+"""
+from pyspark.serializers import ArrowSerializer
+from pyspark.sql.types import from_arrow_schema, to_arrow_type, 
_cast_pandas_series_type
+import pyarrow as pa
+
+# Slice the DataFrame into batches
+step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
+pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
+
+if schema is None or isinstance(schema, list):
+batches = [pa.RecordBatch.from_pandas(pdf_slice, 
preserve_index=False)
+   for pdf_slice in pdf_slices]
+
+# There will be at least 1 batch after slicing the 
pandas.DataFrame
+schema_from_arrow = from_arrow_schema(batches[0].schema)
+
+# If passed schema as a list of names then rename fields
+if isinstance(schema, list):
+fields = []
+for i, field in enumerate(schema_from_arrow):
+field.name = schema[i]
+fields.append(field)
+schema = StructType(fields)
+else:
+schema = schema_from_arrow
+else:
+batches = []
+for i, pdf_slice in enumerate(pdf_slices):
+
+# convert to series to pyarrow.Arrays to use mask when 
creating Arrow batches
+arrs = []
+names = []
+for c, (_, series) in enumerate(pdf_slice.iteritems()):
+field = schema[c]
+names.append(field.name)
+t = to_arrow_type(field.dataType)
+try:
+# NOTE: casting is not necessary with Arrow >= 0.7
+
arrs.append(pa.Array.from_pandas(_cast_pandas_series_type(series, t),
--- End diff --

I think that is a problem with using `astype` which doesn't provide any 
checks afaik.  This casting is better done in Arrow, but since we are currently 
stuck on 0.4.1 we need this workaround.  Trying this out with the latest arrow 
would give the user a nice error:
```
>>> pa.Array.from_pandas(s, type=pa.int16())

[
  1,
  2,
  10001
]
>>> pa.Array.from_pandas(s, type=pa.int8())
Traceback (most recent call last):
  File "", line 1, in 
  File "pyarrow/array.pxi", line 279, in pyarrow.lib.Array.from_pandas 
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:25865)
  File "pyarrow/array.pxi", line 169, in pyarrow.lib.array 
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24833)
  File "pyarrow/array.pxi", line 70, in pyarrow.lib._ndarray_to_array 
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:24083)
  File "pyarrow/error.pxi", line 77, in pyarrow.lib.check_status 
(/home/bryan/git/arrow/python/build/temp.linux-x86_64-2.7/lib.cxx:7876)
pyarrow.lib.ArrowInvalid: Integer value out of bounds
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >