[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153402807
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is 

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153414219
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS
--- End diff --

nit: could you add a line break at the end of the file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153403573
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
--- End diff --

nit: `foreach` instead of `collect`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153412770
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind" +
+  " address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be" +
+  " managed via a Kubernetes service.")
--- End diff --

nit: ditto about concat strings.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153414331
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 ---
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -x
+
+# Check whether there is a passwd entry for the container UID
+myuid=$(id -u)
+mygid=$(id -g)
+uidentry=$(getent passwd $myuid)
+
+# If there is no passwd entry for the container UID, attempt to create one
+if [ -z "$uidentry" ] ; then
+if [ -w /etc/passwd ] ; then
+echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" 
>> /etc/passwd
+else
+echo "Container ENTRYPOINT failed to add passwd entry for 
anonymous UID"
+fi
+fi
+
+# Execute the container CMD under tini for better hygiene
+/sbin/tini -s -- "$@"
--- End diff --

nit: ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153406057
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153403697
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
--- End diff --

nit: can we use `ArrayBuffer` explicitly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153405181
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408117
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153414260
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
+
+RUN apk upgrade --no-cache && \
+apk add --no-cache bash tini && \
+mkdir -p /opt/spark && \
+mkdir -p /opt/spark/work-dir \
+touch /opt/spark/RELEASE && \
+rm /bin/sh && \
+ln -sv /bin/bash /bin/sh && \
+chgrp root /etc/passwd && chmod ug+rw /etc/passwd
+
+COPY jars /opt/spark/jars
+COPY bin /opt/spark/bin
+COPY sbin /opt/spark/sbin
+COPY conf /opt/spark/conf
+COPY dockerfiles/spark-base/entrypoint.sh /opt/
+
+ENV SPARK_HOME /opt/spark
+
+WORKDIR /opt/spark/work-dir
+
+ENTRYPOINT [ "/opt/entrypoint.sh" ]
--- End diff --

nit: ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19793: [SPARK-22574] [Mesos] [Submit] Check submission r...

2017-11-27 Thread Gschiavon
Github user Gschiavon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19793#discussion_r153415074
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala 
---
@@ -86,6 +86,8 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
 message.clientSparkVersion = "1.2.3"
 message.appResource = "honey-walnut-cherry.jar"
 message.mainClass = "org.apache.spark.examples.SparkPie"
+message.appArgs = Array("hdfs://tmp/auth")
+message.environmentVariables = Map("SPARK_HOME" -> "/test")
--- End diff --

@susanxhuynh I've checked what you said but those variables are overwritten 
below, so I can't actually check it, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19793: [SPARK-22574] [Mesos] [Submit] Check submission r...

2017-11-27 Thread Gschiavon
Github user Gschiavon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19793#discussion_r153414519
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala 
---
@@ -86,6 +86,8 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
 message.clientSparkVersion = "1.2.3"
 message.appResource = "honey-walnut-cherry.jar"
 message.mainClass = "org.apache.spark.examples.SparkPie"
+message.appArgs = Array("hdfs://tmp/auth")
+message.environmentVariables = Map("SPARK_HOME" -> "/test")
--- End diff --

@felixcheung okay done :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19793: [SPARK-22574] [Mesos] [Submit] Check submission r...

2017-11-27 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19793#discussion_r153413536
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala 
---
@@ -86,6 +86,8 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
 message.clientSparkVersion = "1.2.3"
 message.appResource = "honey-walnut-cherry.jar"
 message.mainClass = "org.apache.spark.examples.SparkPie"
+message.appArgs = Array("hdfs://tmp/auth")
+message.environmentVariables = Map("SPARK_HOME" -> "/test")
--- End diff --

and probably don't want to set SPARK_HOME either? :)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19827: [SPARK-22617][SQL] make splitExpressions extract current...

2017-11-27 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19827
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19828: [SPARK-22614] Dataset API: repartitionByRange(...)

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19828
  
LGTM. 

Like what @hvanhovell suggested, we can fix it in the follow-up PR. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153412241
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

Ah, I see. Then it makes sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19828: [SPARK-22614] Dataset API: repartitionByRange(...)

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19828
  
**[Test build #84247 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84247/testReport)**
 for PR 19828 at commit 
[`fe690fc`](https://github.com/apache/spark/commit/fe690fc8b2d64127b5505ce6719d6c56d61714f9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19810
  
When all data is in memory, what do you mean by reading less data? Starting 
less tasks makes sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153411636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

`GenerateUnsafeProjection.createCode` can be used in whole stage codegen


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153410825
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

`GenerateUnsafeProjection` is an compile unit by itself. The generated from 
it doesn't mix with wholestage codegen. It also has its codegen context. That 
is why I think this change doesn't make much sense. It also doesn't make sense 
to support wholestage codegen for it, IIUC.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19793: [SPARK-22574] [Mesos] [Submit] Check submission r...

2017-11-27 Thread Gschiavon
Github user Gschiavon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19793#discussion_r153410771
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
 ---
@@ -82,6 +82,12 @@ private[mesos] class MesosSubmitRequestServlet(
 val mainClass = Option(request.mainClass).getOrElse {
   throw new SubmitRestMissingFieldException("Main class is missing.")
 }
+val appArgs = Option(request.appArgs).getOrElse {
+  throw new SubmitRestMissingFieldException("Application arguments are 
missing.")
--- End diff --

Done @ArtRand 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408574
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is 

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153410482
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
--- End diff --

Did you add support of this configuration "spark.driver.userClassPathFirst" 
and "spark.driver.userClassPathFirst"? Sorry I cannot find it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153407637
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
--- End diff --

This configuration seems like a set of String options, we should check all 
the legal options like this SQL conf:

```scala
  val CATALOG_IMPLEMENTATION = 
buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
```

Beside we should add `checkValue` for the `ConfigEntry` if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153407820
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
--- End diff --

Here the style should be unified after #19631 is merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408859
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

We should also add check for `validateKillArguments` and 
`validateStatusRequstArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153410340
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2747,9 +2755,41 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
   @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-RepartitionByExpression(
-  partitionExprs.map(_.expr), logicalPlan, 
sparkSession.sessionState.conf.numShufflePartitions)
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+repartition(sparkSession.sessionState.conf.numShufflePartitions, 
partitionExprs: _*)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting Dataset is range partitioned.
+   *
+   * @group typedrel
+   * @since 2.3.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): 
Dataset[T] = withTypedPlan {
--- End diff --

Good call! Raised 
[SPARK-22624](https://issues.apache.org/jira/browse/SPARK-22624).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19793: [SPARK-22574] [Mesos] [Submit] Check submission r...

2017-11-27 Thread Gschiavon
Github user Gschiavon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19793#discussion_r153410217
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
 ---
@@ -46,6 +46,8 @@ private[rest] class CreateSubmissionRequest extends 
SubmitRestProtocolRequest {
 super.doValidate()
 assert(sparkProperties != null, "No Spark properties set!")
 assertFieldIsSet(appResource, "appResource")
+assertFieldIsSet(appArgs, "appArgs")
+assertFieldIsSet(environmentVariables, "environmentVariables")
--- End diff --

Actually If the caller wouldn't set "appArgs" or "environmentVariables" was 
causing a null pointer and leaving the Dispatcher inactive. So now I think the 
caller should pass an empty array, I could add a test for that case 
@susanxhuynh .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19793: [SPARK-22574] [Mesos] [Submit] Check submission request ...

2017-11-27 Thread Gschiavon
Github user Gschiavon commented on the issue:

https://github.com/apache/spark/pull/19793
  
Yes @ArtRand! I think it's not documented at all, ping me when it's done 
and I will review it :)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19828: [SPARK-22614] Dataset API: repartitionByRange(......

2017-11-27 Thread adrian-ionescu
Github user adrian-ionescu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19828#discussion_r153409247
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -838,6 +839,27 @@ case class RepartitionByExpression(
 
   require(numPartitions > 0, s"Number of partitions ($numPartitions) must 
be positive.")
 
+  require(partitionExpressions.nonEmpty, "At least one partition-by 
expression must be specified.")
+
+  val partitioning: Partitioning = {
+val (sortOrder, nonSortOrder) = 
partitionExpressions.partition(_.isInstanceOf[SortOrder])
--- End diff --

It's going to follow the `HashPartitioning` path and eventually lead to a 
"Cannot evaluate expression" exception, just like it would presently do if you 
tried running `df.repartition($"col".asc +1)` or `df.sort($"col".asc + 1)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #84246 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84246/testReport)**
 for PR 19222 at commit 
[`4af0449`](https://github.com/apache/spark/commit/4af044906dc79d202c7fce9e92c603206691aeb8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-27 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/19607
  
I guess we should look at R to see if it should behavior similarly? WDYT 
@HyukjinKwon ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19810
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19810
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84240/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19810
  
**[Test build #84240 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84240/testReport)**
 for PR 19810 at commit 
[`9d450ad`](https://github.com/apache/spark/commit/9d450addd095e2ac78770bdd03c3bf77817379e8).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19827: [SPARK-22617][SQL] make splitExpressions extract current...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19827
  
**[Test build #84245 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84245/testReport)**
 for PR 19827 at commit 
[`70d7d7c`](https://github.com/apache/spark/commit/70d7d7c910bf61323f61b8809675a92744419914).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153401812
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -49,25 +49,18 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
   input: String,
   fieldTypes: Seq[DataType],
   bufferHolder: String): String = {
+// Puts `input` in a local variable to avoid to re-evaluate it if it's 
a statement.
+val tmpInput = ctx.freshName("tmpInput")
--- End diff --

Actually I add it back as it fixes a bug which is exposed by 
https://github.com/apache/spark/pull/19827/files#diff-90b107e5c61791e17d5b4b25021b89fdR168
 .

Previously we never do the methods splitting here in whole stage codegen 
path. Now I released the constraint and need this fix, to avoid putting code 
statement in method parameter list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19607
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84242/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19607
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19607
  
**[Test build #84242 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84242/testReport)**
 for PR 19607 at commit 
[`9200f38`](https://github.com/apache/spark/commit/9200f38b6414255a5c60127aeeae517086ba108b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-27 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153401411
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$SPARK_VERSION")
--- End diff --

IMO we should, as a part of releasing Spark with this feature, to have 
Docker image in the same release. Granted, it is not a guarantee, but given 
what we know about building Docker images and Docker image publishing supported 
officially by the ASF / ASF Infra, this shouldn't be a problem.

On the flip side if we couldn't have "official" or default images, then we 
should consider how to build some validations in the startup path such that 
"wrong" or mismatch images can be detected quickly / fail-fast.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-27 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153401195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -153,6 +152,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   // --- BroadcastHashJoin 

 
+  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right)
--- End diff --

 if users ask to broadcast both the join side in the hint will pick a 
smaller side to broadcast according to stats


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19714: [SPARK-22489][SQL] Shouldn't change broadcast joi...

2017-11-27 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/19714#discussion_r153401036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -223,4 +223,36 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 assert(HashJoin.rewriteKeyExpr(l :: ss :: Nil) === l :: ss :: Nil)
 assert(HashJoin.rewriteKeyExpr(i :: ss :: Nil) === i :: ss :: Nil)
   }
+
+  test("Shouldn't change broadcast join buildSide if user clearly 
specified") {
+spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", 
"value").createTempView("table1")
+spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", 
"value").createTempView("table2")
+
+def assertJoinBuildSide(pair: (String, BuildSide)): Any = {
+  val (sqlString, s) = pair
+  val df = sql(sqlString)
+  val physical = df.queryExecution.executedPlan
+  physical match {
--- End diff --

Just check the results in the absence of this patch is also successful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153401030
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
--- End diff --

Sure, will update #19468.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19714: [SPARK-22489][SQL] Shouldn't change broadcast join build...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19714
  
**[Test build #84244 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84244/testReport)**
 for PR 19714 at commit 
[`793a9fc`](https://github.com/apache/spark/commit/793a9fcb3eac80f92fe145f1879aac970bfbee45).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-11-27 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/19468
  
I see the latest changes are backported back to the fork - thanks!  it's 
useful to do since I think we get more comprehensive test coverage there, for 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153400419
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$SPARK_VERSION")
--- End diff --

The concern is the availability of this default image. If we can make a 
strong guarantee by making tagging and publication of the images part of the 
release process, this will not be an issue. @mridulm @foxish @mccheah thoughts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153400379
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
--- End diff --

Maybe we should do it in #19468 as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153399528
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

maybe I should add one more check: `ctx.currentVars != null`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153399462
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

I'm not sure I follow, if it's null, here I give up the splitting and just 
return `writeFields.mkString("\n")`. This is exactly what the previous code did


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - B...

2017-11-27 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19468#discussion_r153399402
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$SPARK_VERSION")
--- End diff --

I'm somewhat concerned about this though.
IMO one of key value we delivery with this feature is running on k8s out of 
the box, and the Docker images we are going released with Spark is a key aspect 
of that (again, as discussed, we should highlight that this SPIP/feature is 
going to deliver and release that)

Secondly, it is going to be very easy to make a mistake and set the "wrong" 
Docker images (there are many many ways this could go wrong, including mismatch 
versions of submission client vs driver &/ executor) that would be very hard to 
diagnose (by no means it will fail fast - worse case you get some subtle 
correctness issues in data output). And IMO having this version number set 
consistently by default is really going to help that.

What's the concern with having the default value?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19813
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19813
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84241/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19813
  
**[Test build #84241 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84241/testReport)**
 for PR 19813 at commit 
[`6368702`](https://github.com/apache/spark/commit/6368702e66948e26c41300da7136dffc5b963cb6).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19717
  
**[Test build #84243 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84243/testReport)**
 for PR 19717 at commit 
[`4164ba5`](https://github.com/apache/spark/commit/4164ba575e58aa4e5756d5d82d319c2ed6196d5a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on the issue:

https://github.com/apache/spark/pull/19717
  
@jerryshao Made `Client` implement `SparkApplication` in 
https://github.com/apache/spark/pull/19717/commits/4164ba575e58aa4e5756d5d82d319c2ed6196d5a.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19747: [Spark-22431][SQL] Ensure that the datatype in the schem...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19747
  
LGTM except a few minor comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396436
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
---
@@ -117,6 +117,21 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with 
SharedSQLContext with Befo
 }
   }
 
+  test("SPARK-22431: table with nested type col with special char") {
+withTable("t") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  assert(spark.sql("SELECT * FROM t").count() == 0L)
+}
+  }
+
+  test("SPARK-22431: view with nested type") {
+withView("t", "v") {
+  spark.sql("CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  checkAnswer(sql("SELECT * FROM t"), Row(Row("a", 1)) :: Nil)
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT * FROM t"), Row(Row("a", 1)) :: Nil)
--- End diff --

The same issues in these two test cases


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396272
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
 testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+val queries = Seq(
+  "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+  "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+  "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+queries.foreach(query => {
+  val err = intercept[SparkException] {
+spark.sql(query)
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+})
+
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+  val err = intercept[SparkException] {
+spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+}
+  }
+
+  test("SPARK-22431: table with nested type") {
+withTable("t", "x") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  checkAnswer(sql("SELECT * FROM t"), Nil)
+  spark.sql("CREATE TABLE x (q STRUCT, i1 INT)")
+  checkAnswer(sql("SELECT * FROM x"), Nil)
+}
+  }
+
+  test("SPARK-22431: view with nested type") {
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(spark.sql("SELECT * FROM v"), Row(Row("a", 1)) :: Nil)
--- End diff --

The same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396233
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
 testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+val queries = Seq(
+  "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+  "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+  "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+queries.foreach(query => {
+  val err = intercept[SparkException] {
+spark.sql(query)
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+})
+
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+  val err = intercept[SparkException] {
+spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+}
+  }
+
+  test("SPARK-22431: table with nested type") {
+withTable("t", "x") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  checkAnswer(sql("SELECT * FROM t"), Nil)
--- End diff --

The same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396188
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
 testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+val queries = Seq(
+  "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+  "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+  "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+queries.foreach(query => {
+  val err = intercept[SparkException] {
+spark.sql(query)
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+})
+
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+  val err = intercept[SparkException] {
+spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+}
+  }
+
+  test("SPARK-22431: table with nested type") {
+withTable("t", "x") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  checkAnswer(sql("SELECT * FROM t"), Nil)
+  spark.sql("CREATE TABLE x (q STRUCT, i1 INT)")
+  checkAnswer(sql("SELECT * FROM x"), Nil)
+}
+  }
+
+  test("SPARK-22431: view with nested type") {
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(spark.sql("SELECT * FROM v"), Row(Row("a", 1)) :: Nil)
+
+  spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
+  val df = spark.sql("SELECT * FROM v")
+  assert("q1".equals(df.schema.fields(0).name))
+  checkAnswer(df, Row(Row("a", 1)) :: Nil)
+}
+  }
+
+  test("SPARK-22431: alter table tests with nested types") {
+withTable("t1", "t2", "t3") {
+  spark.sql("CREATE TABLE t1 (q STRUCT, i1 
INT)")
+  spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`col1`:STRING, 
col2:Int>)")
+  val newcol = spark.sql("SELECT * FROM t1").schema.fields(2).name
+  assert("newcol1".equals(newcol))
+
+  spark.sql("CREATE TABLE t2(q STRUCT<`a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol1 
STRUCT<`$col1`:STRING, col2:Int>)")
+  spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, 
col2:Int>)")
+
+  val df2 = spark.sql("SELECT * FROM t2")
+  checkAnswer(df2, Nil)
--- End diff --

The same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396208
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
 testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+val queries = Seq(
+  "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+  "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+  "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+queries.foreach(query => {
+  val err = intercept[SparkException] {
+spark.sql(query)
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+})
+
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+  val err = intercept[SparkException] {
+spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+}
+  }
+
+  test("SPARK-22431: table with nested type") {
+withTable("t", "x") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  checkAnswer(sql("SELECT * FROM t"), Nil)
+  spark.sql("CREATE TABLE x (q STRUCT, i1 INT)")
+  checkAnswer(sql("SELECT * FROM x"), Nil)
+}
+  }
+
+  test("SPARK-22431: view with nested type") {
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(spark.sql("SELECT * FROM v"), Row(Row("a", 1)) :: Nil)
+
+  spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
+  val df = spark.sql("SELECT * FROM v")
--- End diff --

The same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396224
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
 testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+val queries = Seq(
+  "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+  "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+  "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+queries.foreach(query => {
+  val err = intercept[SparkException] {
+spark.sql(query)
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+})
+
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+  val err = intercept[SparkException] {
+spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+}
+  }
+
+  test("SPARK-22431: table with nested type") {
+withTable("t", "x") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  checkAnswer(sql("SELECT * FROM t"), Nil)
+  spark.sql("CREATE TABLE x (q STRUCT, i1 INT)")
+  checkAnswer(sql("SELECT * FROM x"), Nil)
--- End diff --

The same here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19747: [Spark-22431][SQL] Ensure that the datatype in th...

2017-11-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19747#discussion_r153396174
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -174,6 +174,88 @@ class HiveCatalogedDDLSuite extends DDLSuite with 
TestHiveSingleton with BeforeA
   test("alter datasource table add columns - partitioned - orc") {
 testAddColumnPartitioned("orc")
   }
+
+  test("SPARK-22431: illegal nested type") {
+val queries = Seq(
+  "CREATE TABLE t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q",
+  "CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT)",
+  "CREATE VIEW t AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+
+queries.foreach(query => {
+  val err = intercept[SparkException] {
+spark.sql(query)
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+})
+
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(sql("SELECT q.`a`, q.b FROM v"), Row("a", 1) :: Nil)
+
+  val err = intercept[SparkException] {
+spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `$a`, 1 AS b) q")
+  }.getMessage
+  assert(err.contains("Cannot recognize hive type string"))
+}
+  }
+
+  test("SPARK-22431: table with nested type") {
+withTable("t", "x") {
+  spark.sql("CREATE TABLE t(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  checkAnswer(sql("SELECT * FROM t"), Nil)
+  spark.sql("CREATE TABLE x (q STRUCT, i1 INT)")
+  checkAnswer(sql("SELECT * FROM x"), Nil)
+}
+  }
+
+  test("SPARK-22431: view with nested type") {
+withView("v") {
+  spark.sql("CREATE VIEW v AS SELECT STRUCT('a' AS `a`, 1 AS b) q")
+  checkAnswer(spark.sql("SELECT * FROM v"), Row(Row("a", 1)) :: Nil)
+
+  spark.sql("ALTER VIEW v AS SELECT STRUCT('a' AS `b`, 1 AS b) q1")
+  val df = spark.sql("SELECT * FROM v")
+  assert("q1".equals(df.schema.fields(0).name))
+  checkAnswer(df, Row(Row("a", 1)) :: Nil)
+}
+  }
+
+  test("SPARK-22431: alter table tests with nested types") {
+withTable("t1", "t2", "t3") {
+  spark.sql("CREATE TABLE t1 (q STRUCT, i1 
INT)")
+  spark.sql("ALTER TABLE t1 ADD COLUMNS (newcol1 STRUCT<`col1`:STRING, 
col2:Int>)")
+  val newcol = spark.sql("SELECT * FROM t1").schema.fields(2).name
+  assert("newcol1".equals(newcol))
+
+  spark.sql("CREATE TABLE t2(q STRUCT<`a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol1 
STRUCT<`$col1`:STRING, col2:Int>)")
+  spark.sql("ALTER TABLE t2 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, 
col2:Int>)")
+
+  val df2 = spark.sql("SELECT * FROM t2")
+  checkAnswer(df2, Nil)
+  assert("newcol1".equals(df2.schema.fields(2).name))
+  assert("newcol2".equals(df2.schema.fields(3).name))
+
+  spark.sql("CREATE TABLE t3(q STRUCT<`$a`:INT, col2:STRING>, i1 INT) 
USING PARQUET")
+  spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol1 
STRUCT<`$col1`:STRING, col2:Int>)")
+  spark.sql("ALTER TABLE t3 ADD COLUMNS (newcol2 STRUCT<`col1`:STRING, 
col2:Int>)")
+
+  val df3 = spark.sql("SELECT * FROM t3")
+  checkAnswer(df3, Nil)
--- End diff --

`checkAnswer(spark.table("t3"), Nil)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19813
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19813
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84239/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19813
  
**[Test build #84239 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84239/testReport)**
 for PR 19813 at commit 
[`d051f9e`](https://github.com/apache/spark/commit/d051f9eef4d03f9027571419857f690c866dbd98).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19747: [Spark-22431][SQL] Ensure that the datatype in the schem...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19747
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84238/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19747: [Spark-22431][SQL] Ensure that the datatype in the schem...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19747
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19747: [Spark-22431][SQL] Ensure that the datatype in the schem...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19747
  
**[Test build #84238 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84238/testReport)**
 for PR 19747 at commit 
[`a1c8a6d`](https://github.com/apache/spark/commit/a1c8a6d308b62f3439f07dbf3257b51855cb09d8).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19792#discussion_r153394082
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1722,6 +1723,83 @@ def test_infer_long_type(self):
 self.assertEqual(_infer_type(2**61), LongType())
 self.assertEqual(_infer_type(2**71), LongType())
 
+def test_merge_type(self):
+self.assertEqual(_merge_type(LongType(), NullType()), LongType())
+self.assertEqual(_merge_type(NullType(), LongType()), LongType())
+
+self.assertEqual(_merge_type(LongType(), LongType()), LongType())
+
+self.assertEqual(_merge_type(
+ArrayType(LongType()),
+ArrayType(LongType())
+), ArrayType(LongType()))
+with self.assertRaisesRegexp(TypeError, 'arrayElement'):
+_merge_type(ArrayType(LongType()), ArrayType(DoubleType()))
+
+self.assertEqual(_merge_type(
+MapType(StringType(), LongType()),
+MapType(StringType(), LongType())
+), MapType(StringType(), LongType()))
+with self.assertRaisesRegexp(TypeError, 'mapKey'):
+_merge_type(
+MapType(StringType(), LongType()),
+MapType(DoubleType(), LongType()))
+with self.assertRaisesRegexp(TypeError, 'mapValue'):
+_merge_type(
+MapType(StringType(), LongType()),
+MapType(StringType(), DoubleType()))
+
+self.assertEqual(_merge_type(
+StructType([StructField("f1", LongType()), StructField("f2", 
StringType())]),
+StructType([StructField("f1", LongType()), StructField("f2", 
StringType())])
+), StructType([StructField("f1", LongType()), StructField("f2", 
StringType())]))
+with self.assertRaisesRegexp(TypeError, r'structField\("f1"\)'):
--- End diff --

Hmm, I don't think we need double backslashes here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19783
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84236/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19783
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19783
  
**[Test build #84236 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84236/testReport)**
 for PR 19783 at commit 
[`052d111`](https://github.com/apache/spark/commit/052d11159478da563bd9b514b4267b28ba3347f9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-27 Thread CodingCat
Github user CodingCat commented on the issue:

https://github.com/apache/spark/pull/19810
  
Hi, @cloud-fan, this PR is not only for the case where the data size is 
larger than the memory size, even when all data is in-memory, I observed up to 
10-40% speedup  because the implementation here

(1) read less data

(2) started less tasks

you can understand this PR as it implement the functionality of Parquet's 
footer for the in-memory table




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-27 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19607
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19827: [SPARK-22617][SQL] make splitExpressions extract current...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19827
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19827: [SPARK-22617][SQL] make splitExpressions extract current...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19827
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84237/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19827: [SPARK-22617][SQL] make splitExpressions extract current...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19827
  
**[Test build #84237 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84237/testReport)**
 for PR 19827 at commit 
[`af7fb8d`](https://github.com/apache/spark/commit/af7fb8d5f3ce91e49ef26289a7f40bc71c0b17ec).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19717
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84235/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19717
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19717
  
**[Test build #84235 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84235/testReport)**
 for PR 19717 at commit 
[`0f90746`](https://github.com/apache/spark/commit/0f90746447b73148544fae02b69855d367bbf0e9).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19468
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84234/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-11-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19468
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19468: [SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Sc...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19468
  
**[Test build #84234 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84234/testReport)**
 for PR 19468 at commit 
[`c386186`](https://github.com/apache/spark/commit/c3861864efbde824beb722d0cdfe861277086c48).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of timestamp...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19607
  
**[Test build #84242 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84242/testReport)**
 for PR 19607 at commit 
[`9200f38`](https://github.com/apache/spark/commit/9200f38b6414255a5c60127aeeae517086ba108b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386064
  
--- Diff: python/pyspark/sql/session.py ---
@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
 record_type_list.append((str(col_names[i]), curr_type))
 return np.dtype(record_type_list) if has_rec_fix else None
 
-def _convert_from_pandas(self, pdf):
+def _convert_from_pandas(self, pdf, schema, timezone):
--- End diff --

Thanks, I agree with it but maybe I'll leave those as they are in this pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386019
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3683,6 +3808,47 @@ def check_records_per_batch(x):
 else:
 
self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value)
 
+def test_vectorized_udf_timestamps_respect_session_timezone(self):
+from pyspark.sql.functions import pandas_udf, col
+from datetime import datetime
+import pandas as pd
+schema = StructType([
+StructField("idx", LongType(), True),
+StructField("timestamp", TimestampType(), True)])
+data = [(1, datetime(1969, 1, 1, 1, 1, 1)),
+(2, datetime(2012, 2, 2, 2, 2, 2)),
+(3, None),
+(4, datetime(2100, 3, 3, 3, 3, 3))]
+df = self.spark.createDataFrame(data, schema=schema)
+
+f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType())
+internal_value = pandas_udf(
+lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT 
else None), LongType())
+
+orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+try:
+timezone = "America/New_York"
+self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", 
"false")
+try:
+df_la = df.withColumn("tscopy", 
f_timestamp_copy(col("timestamp"))) \
+.withColumn("internal_value", 
internal_value(col("timestamp")))
+result_la = df_la.select(col("idx"), 
col("internal_value")).collect()
+diff = 3 * 60 * 60 * 1000 * 1000 * 1000
--- End diff --

Yes, I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386047
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
  for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+""" Create an error message for importing old Pandas.
 """
-Convert timezone aware timestamps to timezone-naive in local time
+msg = "note: Pandas (>=0.19.2) must be installed and available on 
calling Python process"
+return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+"""
+Convert timezone aware timestamps to timezone-naive in the specified 
timezone or local timezone
 
 :param pdf: pandas.DataFrame
-:return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
+:param timezone: the timezone to convert. if None then use local 
timezone
+:return pandas.DataFrame where any timezone aware columns have been 
converted to tz-naive
 """
-from pandas.api.types import is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+tz = timezone or 'tzlocal()'
 for column, series in pdf.iteritems():
 # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
 if is_datetime64tz_dtype(series.dtype):
-pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
 return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
 """
-Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
+Convert a tz-naive timestamp in the specified timezone or local 
timezone to UTC normalized for
+Spark internal storage
+
 :param s: a pandas.Series
+:param timezone: the timezone to convert. if None then use local 
timezone
 :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
 """
-from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
-return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+tz = timezone or 'tzlocal()'
+return s.dt.tz_localize(tz).dt.tz_convert('UTC')
 elif is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert('UTC')
 else:
 return s
 
 
+def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
+"""
+Convert timestamp to timezone-naive in the specified timezone or local 
timezone
+
+:param s: a pandas.Series
+:param fromTimezone: the timezone to convert from. if None then use 
local timezone
+:param toTimezone: the timezone to convert to. if None then use local 
timezone
+:return pandas.Series where if it is a timestamp, has been converted 
to tz-naive
+"""
+try:
+import pandas as pd
+from pandas.api.types import is_datetime64tz_dtype, 
is_datetime64_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+fromTz = fromTimezone or 'tzlocal()'
--- End diff --

I'll update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386033
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1678,37 +1679,105 @@ def from_arrow_schema(arrow_schema):
  for field in arrow_schema])
 
 
-def _check_dataframe_localize_timestamps(pdf):
+def _old_pandas_exception_message(e):
+""" Create an error message for importing old Pandas.
 """
-Convert timezone aware timestamps to timezone-naive in local time
+msg = "note: Pandas (>=0.19.2) must be installed and available on 
calling Python process"
+return "%s\n%s" % (_exception_message(e), msg)
+
+
+def _check_dataframe_localize_timestamps(pdf, timezone):
+"""
+Convert timezone aware timestamps to timezone-naive in the specified 
timezone or local timezone
 
 :param pdf: pandas.DataFrame
-:return pandas.DataFrame where any timezone aware columns have be 
converted to tz-naive
+:param timezone: the timezone to convert. if None then use local 
timezone
+:return pandas.DataFrame where any timezone aware columns have been 
converted to tz-naive
 """
-from pandas.api.types import is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
+tz = timezone or 'tzlocal()'
 for column, series in pdf.iteritems():
 # TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
 if is_datetime64tz_dtype(series.dtype):
-pdf[column] = 
series.dt.tz_convert('tzlocal()').dt.tz_localize(None)
+pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None)
 return pdf
 
 
-def _check_series_convert_timestamps_internal(s):
+def _check_series_convert_timestamps_internal(s, timezone):
 """
-Convert a tz-naive timestamp in local tz to UTC normalized for Spark 
internal storage
+Convert a tz-naive timestamp in the specified timezone or local 
timezone to UTC normalized for
+Spark internal storage
+
 :param s: a pandas.Series
+:param timezone: the timezone to convert. if None then use local 
timezone
 :return pandas.Series where if it is a timestamp, has been UTC 
normalized without a time zone
 """
-from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+try:
+from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
+except ImportError as e:
+raise ImportError(_old_pandas_exception_message(e))
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64_dtype(s.dtype):
-return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC')
+tz = timezone or 'tzlocal()'
+return s.dt.tz_localize(tz).dt.tz_convert('UTC')
 elif is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert('UTC')
 else:
 return s
 
 
+def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone):
--- End diff --

Thanks, I'll update it. Maybe `toTimestamp` -> `to_timestamp` as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [WIP][SPARK-22600][SQL] Fix 64kb limit for deeply nested...

2017-11-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19813
  
**[Test build #84241 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84241/testReport)**
 for PR 19813 at commit 
[`6368702`](https://github.com/apache/spark/commit/6368702e66948e26c41300da7136dffc5b963cb6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153386007
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3192,16 +3255,49 @@ def test_filtered_frame(self):
 self.assertEqual(pdf.columns[0], "i")
 self.assertTrue(pdf.empty)
 
-def test_createDataFrame_toggle(self):
-pdf = self.create_pandas_data_frame()
+def _createDataFrame_toggle(self, pdf, schema=None):
 self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
 try:
-df_no_arrow = self.spark.createDataFrame(pdf)
+df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
 finally:
 self.spark.conf.set("spark.sql.execution.arrow.enabled", 
"true")
-df_arrow = self.spark.createDataFrame(pdf)
+df_arrow = self.spark.createDataFrame(pdf, schema=schema)
+return df_no_arrow, df_arrow
+
+def test_createDataFrame_toggle(self):
+pdf = self.create_pandas_data_frame()
+df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, 
schema=self.schema)
 self.assertEquals(df_no_arrow.collect(), df_arrow.collect())
 
+def test_createDataFrame_respect_session_timezone(self):
+from datetime import timedelta
+pdf = self.create_pandas_data_frame()
+orig_tz = self.spark.conf.get("spark.sql.session.timeZone")
+try:
+timezone = "America/New_York"
+self.spark.conf.set("spark.sql.session.timeZone", timezone)
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", 
"false")
+try:
+df_no_arrow_la, df_arrow_la = 
self._createDataFrame_toggle(pdf, schema=self.schema)
+result_la = df_no_arrow_la.collect()
+result_arrow_la = df_arrow_la.collect()
+self.assertEqual(result_la, result_arrow_la)
+finally:
+
self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true")
+df_no_arrow_ny, df_arrow_ny = 
self._createDataFrame_toggle(pdf, schema=self.schema)
+result_ny = df_no_arrow_ny.collect()
+result_arrow_ny = df_arrow_ny.collect()
+self.assertEqual(result_ny, result_arrow_ny)
+
+self.assertNotEqual(result_ny, result_la)
+
+result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k 
== '7_timestamp_t' else v
--- End diff --

Yes, the 3 hours timedelta is the time difference.
I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19607: [SPARK-22395][SQL][PYTHON] Fix the behavior of ti...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19607#discussion_r153385979
  
--- Diff: python/pyspark/sql/session.py ---
@@ -444,11 +445,30 @@ def _get_numpy_record_dtype(self, rec):
 record_type_list.append((str(col_names[i]), curr_type))
 return np.dtype(record_type_list) if has_rec_fix else None
 
-def _convert_from_pandas(self, pdf):
+def _convert_from_pandas(self, pdf, schema, timezone):
 """
  Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
  :return list of records
 """
+if timezone is not None:
+from pyspark.sql.types import 
_check_series_convert_timestamps_tz_local
+copied = False
+if isinstance(schema, StructType):
+for field in schema:
+# TODO: handle nested timestamps, such as 
ArrayType(TimestampType())?
+if isinstance(field.dataType, TimestampType):
+s = 
_check_series_convert_timestamps_tz_local(pdf[field.name], timezone)
+if not copied and s is not pdf[field.name]:
+pdf = pdf.copy()
+copied = True
--- End diff --

Yes, it's to prevent the original one from being updated.
I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153385444
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

 The `INPUT_ROW` of this local codegen context is not changed in 
`GenerateUnsafeProjection`. If it is `null`, the generated code will become:

```scala
class SpecificUnsafeProjection extends ${classOf[UnsafeProjection].getName} 
{
  ...
  public UnsafeRow apply(InternalRow null) {  // InternalRow 
${ctx.INPUT_ROW} here.
${eval.code.trim}
return ${eval.value};
  }
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153384793
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

This codegen context isn't the same used in wholestage codegen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153384273
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

Also this codegen context in `GenerateUnsafeProjection` is newly created 
and used locally. I think the `ctx.INPUT_ROW` here can't be null.

```scala
private def create(
expressions: Seq[Expression],
subexpressionEliminationEnabled: Boolean): UnsafeProjection = {
  val ctx = newCodeGenContext()
  ...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153384334
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

ideally the row can't be null, the top level is a special case that we may 
pass `ctx.INPUT_ROW` as input row.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19810
  
are you trying to optimize the case that data is too large to fit in 
memory? Spark RDD cache doesn't work well for this case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19827: [SPARK-22617][SQL] make splitExpressions extract ...

2017-11-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19827#discussion_r153383752
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -167,9 +160,20 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 }
 }
 
+val writeFieldsCode = if (isTopLevel && row == null) {
--- End diff --

hmm, can the row be null here? I think `GenerateUnsafeProjection` is 
proposed to use to project against an input row.

Actually, I think it is confusing naming. `ctx.INPUT_ROW` is just used as 
the name of input row. It doesn't actually want to refer `ctx.INPUT_ROW` here.

```scala
public UnsafeRow apply(InternalRow ${ctx.INPUT_ROW}) {
  ${eval.code.trim}
  return ${eval.value};
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19797: [SPARK-22570][SQL] Avoid to create a lot of globa...

2017-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19797#discussion_r153383615
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -164,6 +165,28 @@ class ComplexTypesSuite extends PlanTest{
 comparePlans(Optimizer execute query, expected)
   }
 
+  test("SPARK-22570: should not create a lot of instance variables") {
+val ctx = new CodegenContext
--- End diff --

something like this, we have a codegen context, and do codegen. After that, 
instead of compiling the code to make sure it doesn't fail, we can just check 
the size of `ctx.mutableStates` to confirm that we don't add global variables 
during codegen


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   >