[GitHub] spark pull request #21282: [SPARK-23934][SQL] Adding map_from_entries functi...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21282#discussion_r193958595 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -308,6 +309,234 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp override def prettyName: String = "map_entries" } +/** + * Returns a map created from the given array of entries. + */ +@ExpressionDescription( + usage = "_FUNC_(arrayOfEntries) - Returns a map created from the given array of entries.", + examples = """ +Examples: + > SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b'))); + {1:"a",2:"b"} + """, + since = "2.4.0") +case class MapFromEntries(child: Expression) extends UnaryExpression { + + @transient + private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = child.dataType match { +case ArrayType( + StructType(Array( +StructField(_, keyType, keyNullable, _), +StructField(_, valueType, valueNullable, _))), + containsNull) => Some((MapType(keyType, valueType, valueNullable), keyNullable, containsNull)) +case _ => None + } + + private def nullEntries: Boolean = dataTypeDetails.get._3 + + override def dataType: MapType = dataTypeDetails.get._1 + + override def checkInputDataTypes(): TypeCheckResult = dataTypeDetails match { +case Some(_) => TypeCheckResult.TypeCheckSuccess +case None => TypeCheckResult.TypeCheckFailure(s"'${child.sql}' is of " + + s"${child.dataType.simpleString} type. $prettyName accepts only arrays of pair structs.") + } + + override protected def nullSafeEval(input: Any): Any = { +val arrayData = input.asInstanceOf[ArrayData] +val length = arrayData.numElements() +val numEntries = if (nullEntries) (0 until length).count(!arrayData.isNullAt(_)) else length +val keyArray = new Array[AnyRef](numEntries) +val valueArray = new Array[AnyRef](numEntries) +var i = 0 +var j = 0 +while (i < length) { + if (!arrayData.isNullAt(i)) { --- End diff -- Yeah, that sounds reasonable. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91544/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91544 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91544/testReport)** for PR 21366 at commit [`3b85ab5`](https://github.com/apache/spark/commit/3b85ab52b523dc182227b1ece517765903e64109). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20697: [SPARK-23010][k8s] Initial checkin of k8s integra...
Github user ssuchter commented on a diff in the pull request: https://github.com/apache/spark/pull/20697#discussion_r193957378 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.File +import java.nio.file.{Path, Paths} +import java.util.UUID +import java.util.regex.Pattern + +import scala.collection.JavaConverters._ + +import com.google.common.io.PatternFilenameFilter +import io.fabric8.kubernetes.api.model.{Container, Pod} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.deploy.k8s.integrationtest.config._ + +private[spark] class KubernetesSuite extends SparkFunSuite + with BeforeAndAfterAll with BeforeAndAfter { + + import KubernetesSuite._ + + private var testBackend: IntegrationTestBackend = _ + private var sparkHomeDir: Path = _ + private var kubernetesTestComponents: KubernetesTestComponents = _ + private var sparkAppConf: SparkAppConf = _ + private var image: String = _ + private var containerLocalSparkDistroExamplesJar: String = _ + private var appLocator: String = _ + private var driverPodName: String = _ + + override def beforeAll(): Unit = { +// The scalatest-maven-plugin gives system properties that are referenced but not set null +// values. We need to remove the null-value properties before initializing the test backend. +val nullValueProperties = System.getProperties.asScala + .filter(entry => entry._2.equals("null")) + .map(entry => entry._1.toString) +nullValueProperties.foreach { key => + System.clearProperty(key) +} + +val sparkDirProp = System.getProperty("spark.kubernetes.test.unpackSparkDir") +require(sparkDirProp != null, "Spark home directory must be provided in system properties.") +sparkHomeDir = Paths.get(sparkDirProp) +require(sparkHomeDir.toFile.isDirectory, + s"No directory found for spark home specified at $sparkHomeDir.") +val imageTag = getTestImageTag +val imageRepo = getTestImageRepo +image = s"$imageRepo/spark:$imageTag" + +val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) + .toFile + .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) +containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" + + s"${sparkDistroExamplesJarFile.getName}" +testBackend = IntegrationTestBackendFactory.getTestBackend +testBackend.initialize() +kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) + } + + override def afterAll(): Unit = { +testBackend.cleanUp() + } + + before { +appLocator = UUID.randomUUID().toString.replaceAll("-", "") +driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "") +sparkAppConf = kubernetesTestComponents.newSparkAppConf() + .set("spark.kubernetes.container.image", image) + .set("spark.kubernetes.driver.pod.name", driverPodName) + .set("spark.kubernetes.driver.label.spark-app-locator", appLocator) + .set("spark.kubernetes.executor.label.spark-app-locator", appLocator) +if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { + kubernetesTestComponents.createNamespace() +} + } + + after { +if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
[GitHub] spark issue #20697: [SPARK-23010][k8s] Initial checkin of k8s integration te...
Github user ssuchter commented on the issue: https://github.com/apache/spark/pull/20697 Hi folks - I don't think there is any more work to do on this PR. Is there something you're waiting for before merging it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91543/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91543 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91543/testReport)** for PR 21366 at commit [`0a205f6`](https://github.com/apache/spark/commit/0a205f6bdcb3ec1e730037508bc393fe13ce9b0c). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21501 **[Test build #91553 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91553/testReport)** for PR 21501 at commit [`fc0fe20`](https://github.com/apache/spark/commit/fc0fe20a37e2e40a5675c972352ca3dcc259b32d). * This patch **fails to generate documentation**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21501 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91553/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21501 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21495 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91541/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21495 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21495 **[Test build #91541 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91541/testReport)** for PR 21495 at commit [`f91d75a`](https://github.com/apache/spark/commit/f91d75ae6b77bef6bf7eb8db98a345e6eb822393). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21501: [SPARK-15064][ML] Locale support in StopWordsRemo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21501#discussion_r193954052 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala --- @@ -95,8 +121,7 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String terms.filter(s => !stopWordsSet.contains(s)) } } else { - // TODO: support user locale (SPARK-15064) - val toLower = (s: String) => if (s != null) s.toLowerCase else s + val toLower = (s: String) => if (s != null) s.toLowerCase(new Locale($(locale))) else s --- End diff -- Maybe move `new Locale($(locale))` out of `toLower`, so we don't need to create it every time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3848/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21501 **[Test build #91553 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91553/testReport)** for PR 21501 at commit [`fc0fe20`](https://github.com/apache/spark/commit/fc0fe20a37e2e40a5675c972352ca3dcc259b32d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21258 **[Test build #91552 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91552/testReport)** for PR 21258 at commit [`6d53a96`](https://github.com/apache/spark/commit/6d53a96a0f7ec122fc44a05213256e92856eb3cf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21258 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21498 Benchmarking on a Spark cluster with 5 nodes on EC2 too. ```scala def benchmark(func: () => Unit): Unit = { val t0 = System.nanoTime() func() val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0) + "ns") } val N = 1 val data = (0 until N).map { i => (i, i % 2, i % 3, Array.fill(10)(i), Array.fill(10)(i.toString), Array.fill(10)(i.toDouble), (i, i.toString, i.toDouble)) } val df1 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key") val df2 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) spark.conf.set("spark.sql.unionInSamePartition", "true") val df3 = df1.union(df2).groupBy("key").agg(count("*")) val df4 = df1.union(df2) val df5 = df3.sample(0.8).filter($"key" > 100).sample(0.4) val df6 = df4.sample(0.8).filter($"key" > 100).sample(0.4) benchmark(() => df3.collect) benchmark(() => df4.collect) benchmark(() => df5.collect) benchmark(() => df6.collect) ``` Before: ```scala scala> benchmark(() => df3.collect) Elapsed time: 663668585ns scala> benchmark(() => df4.collect) Elapsed time: 547487953ns scala> benchmark(() => df5.collect) Elapsed time: 712634187ns scala> benchmark(() => df6.collect) Elapsed time: 491917400ns ``` After: ```scala scala> benchmark(() => df3.collect) Elapsed time: 516797788ns scala> benchmark(() => df4.collect) Elapsed time: 557499803ns scala> benchmark(() => df5.collect) Elapsed time: 611327782ns scala> benchmark(() => df6.collect) Elapsed time: 495387557ns ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21260: [SPARK-23529][K8s] Support mounting volumes
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21260#discussion_r193950938 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ + +private[spark] object KubernetesVolumeUtils { + + /** + * Extract Spark volume configuration properties with a given name prefix. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseVolumesWithPrefix( +sparkConf: SparkConf, +prefix: String): Iterable[KubernetesVolumeSpec] = { +val properties = sparkConf.getAllWithPrefix(prefix) + +val propsByTypeName: Map[(String, String), Array[(String, String)]] = + properties.flatMap { case (k, v) => +k.split('.').toList match { + case tpe :: name :: rest => Some(((tpe, name), (rest.mkString("."), v))) + case _ => None +} + }.groupBy(_._1).mapValues(_.map(_._2)) + +propsByTypeName.map { case ((tpe, name), props) => + val propMap = props.toMap + val mountProps = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_MOUNT_KEY.") + val options = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_OPTIONS_KEY.") + + KubernetesVolumeSpec( +volumeName = name, +volumeType = tpe, +mountPath = mountProps(KUBERNETES_VOLUMES_PATH_KEY), --- End diff -- Yes, that is good to have. Basically it's good to have a check on the submission client side to make sure the config keys are properly formatted and fail fast if they are invalid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21501 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91551/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21501 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21501 **[Test build #91551 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91551/testReport)** for PR 21501 at commit [`f63cb26`](https://github.com/apache/spark/commit/f63cb26ee6242ce7f7eb975e6d4ce10496993526). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21501 **[Test build #91551 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91551/testReport)** for PR 21501 at commit [`f63cb26`](https://github.com/apache/spark/commit/f63cb26ee6242ce7f7eb975e6d4ce10496993526). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21504 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21504 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91540/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21504 **[Test build #91540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91540/testReport)** for PR 21504 at commit [`02b2973`](https://github.com/apache/spark/commit/02b29731161a3e6ad4841c09a3bf02004e0a87e1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21495 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91538/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21495 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21495 **[Test build #91538 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91538/testReport)** for PR 21495 at commit [`de790fd`](https://github.com/apache/spark/commit/de790fd251ba3727bba23ceb1ca07559d25b7e87). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91542/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21258 **[Test build #91542 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91542/testReport)** for PR 21258 at commit [`228fcc6`](https://github.com/apache/spark/commit/228fcc66e2b85b957833da739a20229867d51cbc). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class MapFromArrays(left: Expression, right: Expression)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193945410 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -150,6 +152,16 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) +val containerWithLimitGpus = executorLimitGpus.map { limitGpus => + val executorGpuLimitQuantity = new QuantityBuilder(false) +.withAmount(limitGpus) +.build() + new ContainerBuilder(containerWithLimitCores) +.editResources() + .addToLimits(gpuProvider+"/gpu", executorGpuLimitQuantity) --- End diff -- Style: whitespace around the "+". More importantly, you're assuming that the name of the resource is always going to be "gpu". I'm not sure this is a great idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193944223 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = +ConfigBuilder("spark.kubernetes.executor.limit.gpus") + .doc("Specify the gpu request for each executor pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_GPU_PROVIDER = +ConfigBuilder("spark.kubernetes.executor.gpu.provider") + .doc("Specify the gpu provider for each executor pod") + .stringConf + .createWithDefault("nvidia.com") + + + --- End diff -- Style remove two lines here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193945389 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala --- @@ -172,7 +184,7 @@ private[spark] class BasicExecutorFeatureStep( .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() -SparkPod(executorPod, containerWithLimitCores) +SparkPod(executorPod, containerWithLimitGpus) --- End diff -- What if you want to request both GPU's and another resource (e.g., FPGA's) for your executors? You'd have to hard-code chain another optional map like you did in the pull requests. Your current code does not pave a good path forward to requesting other types of resources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21511: [SPARK-24491][Kubernetes] Configuration support f...
Github user galv commented on a diff in the pull request: https://github.com/apache/spark/pull/21511#discussion_r193945237 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -104,6 +104,20 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_EXECUTOR_LIMIT_GPUS = --- End diff -- Why not allow the driver to request GPU's as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193945288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- Either debug or info is fine for me, since it would add just couple of log lines only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21477 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91539/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21477 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21477 **[Test build #91539 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91539/testReport)** for PR 21477 at commit [`ecf3d88`](https://github.com/apache/spark/commit/ecf3d88954afdab16c492fd479c1051aff8a3b95). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3847/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21258 **[Test build #91550 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91550/testReport)** for PR 21258 at commit [`6d53a96`](https://github.com/apache/spark/commit/6d53a96a0f7ec122fc44a05213256e92856eb3cf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r193942643 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- Thank you for confirming, @som-snytt . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3702/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21501 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21501 **[Test build #91549 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91549/testReport)** for PR 21501 at commit [`f572684`](https://github.com/apache/spark/commit/f572684e0df29f522bc0d5b1521e7dc5356bae49). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21501 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91549/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r193942183 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -236,6 +236,76 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class MapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] +val valueArrayData = valueArray.asInstanceOf[ArrayData] +if (keyArrayData.numElements != valueArrayData.numElements) { + throw new RuntimeException("The given two arrays should have the same length") +} +val leftArrayType = left.dataType.asInstanceOf[ArrayType] +if (leftArrayType.containsNull) { + var i = 0 + while (i < keyArrayData.numElements) { +if (keyArrayData.isNullAt(i)) { + throw new RuntimeException("Cannot use null as map key!") +} +i += 1 + } +} +new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy()) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => { + val arrayBasedMapData = classOf[ArrayBasedMapData].getName + val leftArrayType = left.dataType.asInstanceOf[ArrayType] + val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else { +val i = ctx.freshName("i") +s""" + |for (int $i = 0; $i < $keyArrayData.numElements(); $i++) { + | if ($keyArrayData.isNullAt($i)) { + |throw new RuntimeException("Cannot use null as map key!"); + | } + |} + """.stripMargin + } + s""" + |if ($keyArrayData.numElements() != $valueArrayData.numElements()) { + | throw new RuntimeException("The given two arrays should have the same length"); + |} + |$keyArrayElemNullCheck + |${ev.value} = new $arrayBasedMapData($keyArrayData.copy(), $valueArrayData.copy()); + """.stripMargin +}) + } + + override def prettyName: String = "create_map_from_arrays" --- End diff -- Oh, good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21501: [SPARK-15064][ML] Locale support in StopWordsRemover
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21501 **[Test build #91549 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91549/testReport)** for PR 21501 at commit [`f572684`](https://github.com/apache/spark/commit/f572684e0df29f522bc0d5b1521e7dc5356bae49). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21482#discussion_r193941785 --- Diff: R/pkg/NAMESPACE --- @@ -281,6 +281,8 @@ exportMethods("%<=>%", "initcap", "input_file_name", "instr", + "isInf", + "isinf", --- End diff -- I think we shouldn't add other variants unless there's a clear reason. It sounds like we are adding this for no reason. > how about we just go with isInf for now and if other aliases are needed in the future they can be added and discussed then? Yea, please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3702/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3846/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3701/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user som-snytt commented on the issue: https://github.com/apache/spark/pull/21495 @dbtsai 2.11 looks similar to 2.12. Do you mean you want the same technique on 2.10? I would not expect to find a single hook for all versions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3701/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3845/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21483 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21483 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91546/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21483 **[Test build #91546 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91546/testReport)** for PR 21483 at commit [`bfcd3aa`](https://github.com/apache/spark/commit/bfcd3aa21cfc0d39db9a9a88aa5febf7de64474d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21092 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91537/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21092 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91548 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91548/testReport)** for PR 21366 at commit [`e9d7c8f`](https://github.com/apache/spark/commit/e9d7c8f779e2969cfa2ba159e1c7295cebddc27b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21092: [SPARK-23984][K8S] Initial Python Bindings for PySpark o...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21092 **[Test build #91537 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91537/testReport)** for PR 21092 at commit [`ab92913`](https://github.com/apache/spark/commit/ab92913f1d0c303a5ff6b2937019d5e47c25611d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user som-snytt commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r193938805 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- Completion was upgraded since the old days; also, other bugs required updating jline. There is interest in upgrading to jline 3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21498 @mgaido91 Ok. I will try to have another one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21483 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21483 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3844/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91547 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91547/testReport)** for PR 21366 at commit [`bd7b0d3`](https://github.com/apache/spark/commit/bd7b0d3e5389300f1d0a3d2663cb73372cdc4dc3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21510 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21483: [SPARK-24477][SPARK-24454][ML][PYTHON] Imports submodule...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21483 **[Test build #91546 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91546/testReport)** for PR 21483 at commit [`bfcd3aa`](https://github.com/apache/spark/commit/bfcd3aa21cfc0d39db9a9a88aa5febf7de64474d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21510 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91536/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193936818 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") + }) +} + } catch { +case e: Exception => + throw new SparkException(s"Exception when registering StreamingQueryListener", e) --- End diff -- nit: `s` seems not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193936698 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- I would do this at debug level .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21510 **[Test build #91536 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91536/testReport)** for PR 21510 at commit [`58a9ec4`](https://github.com/apache/spark/commit/58a9ec42402cc92675e3e057309a803d08fd0cd7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r193936753 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, Utils} + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +clock: Clock) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podCreationTimeout = math.max(podAllocationDelay * 5, 6) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] + + def start(applicationId: String): Unit = { +snapshotsStore.addSubscriber(podAllocationDelay) { + processSnapshot(applicationId, _) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def processSnapshot(applicationId: String, snapshot: ExecutorPodsSnapshot): Unit = { +snapshot.executorPods.filter { --- End diff -- Actually it should probably be for all pods in the snapshot since newlyCreatedExecutors should only track what we haven't seen from the cluster at all --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21507: Branch 1.6
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21507 ping @deepaksonu close this PR please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r193936286 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- Can we upgrade to the corresponding `jline` version together in this PR, @dbtsai ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 Kubernetes integration test status success URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3699/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/3699/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3843/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193935386 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. --- End diff -- I believe `$"columnName"` is more like a language specific feature in Scala and I think `df.columnName` is language specific to Python. > And ultimately convenience is what matters for the user experience. Thing is, it sounded to me like we are kind of prejudging it.. > I think we should also add the lambda variant to Scala as well. +1 I am okay but I hope this shouldn't be usually done next time .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r193935371 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- Yep. Of course, it's a clean clone and build of this PR. Accoring to the error message and the followings. We need `jline-2.14.3.jar` because `scala` uses the API of higher version of `jline`. Could you confirm this, @som-snytt ? ``` $ java -version openjdk version "1.8.0_171" OpenJDK Runtime Environment (build 1.8.0_171-8u171-b11-0ubuntu0.18.04.1-b11) OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode) $ javap -cp jline-2.12.1.jar jline.console.completer.CandidateListCompletionHandler Compiled from "CandidateListCompletionHandler.java" public class jline.console.completer.CandidateListCompletionHandler implements jline.console.completer.CompletionHandler { public jline.console.completer.CandidateListCompletionHandler(); public boolean complete(jline.console.ConsoleReader, java.util.List, int) throws java.io.IOException; public static void setBuffer(jline.console.ConsoleReader, java.lang.CharSequence, int) throws java.io.IOException; public static void printCandidates(jline.console.ConsoleReader, java.util.Collection) throws java.io.IOException; } $ javap -cp jline-2.14.3.jar jline.console.completer.CandidateListCompletionHandler Compiled from "CandidateListCompletionHandler.java" public class jline.console.completer.CandidateListCompletionHandler implements jline.console.completer.CompletionHandler { public jline.console.completer.CandidateListCompletionHandler(); public boolean getPrintSpaceAfterFullCompletion(); public void setPrintSpaceAfterFullCompletion(boolean); public boolean isStripAnsi(); public void setStripAnsi(boolean); public boolean complete(jline.console.ConsoleReader, java.util.List, int) throws java.io.IOException; public static void setBuffer(jline.console.ConsoleReader, java.lang.CharSequence, int) throws java.io.IOException; public static void printCandidates(jline.console.ConsoleReader, java.util.Collection) throws java.io.IOException; } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r193933505 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, Utils} + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +clock: Clock) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podCreationTimeout = math.max(podAllocationDelay * 5, 6) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] + + def start(applicationId: String): Unit = { +snapshotsStore.addSubscriber(podAllocationDelay) { + processSnapshot(applicationId, _) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def processSnapshot(applicationId: String, snapshot: ExecutorPodsSnapshot): Unit = { +snapshot.executorPods.filter { + case (_, PodPending(_)) | (_, PodUnknown(_)) => false + case _ => true +}.keys.foreach { + newlyCreatedExecutors -= _ +} + +// For all executors we've created against the API but have not seen in a snapshot +// yet - check the current time. If the current time has exceeded some threshold, +// assume that the pod was either never created (the API server never properly +// handled the creation request), or the API server created the pod but we missed +// both the creation and deletion events. In either case, delete the missing pod +// if possible, and mark such a pod to be rescheduled below. +(newlyCreatedExecutors.keySet -- snapshot.executorPods.keySet).foreach { execId => --- End diff -- A specific example where the previous approach wouldn't have worked. That said this case seems very unlikely - we'd have to miss all events for the newly created executor for a decent chunk of time. @foxish - would like to confirm that this is the right idea. Would specifically like feedback on if this is done correctly @foxish @liyinan926. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r193933656 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import com.google.common.cache.Cache +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsLifecycleManager( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +// Use a best-effort to track which executors have been removed already. It's not generally +// job-breaking if we remove executors more than once but it's ideal if we make an attempt +// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond +// bounds. +removedExecutorsCache: Cache[java.lang.Long, java.lang.Long]) { + + import ExecutorPodsLifecycleManager._ + + private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +snapshotsStore.addSubscriber(eventProcessingInterval) { + onNextSnapshot(schedulerBackend, _) +} + } + + private def onNextSnapshot( + schedulerBackend: KubernetesClusterSchedulerBackend, + snapshot: ExecutorPodsSnapshot): Unit = { +val execIdsRemovedInThisRound = mutable.HashSet.empty[Long] +snapshot.executorPods.foreach { case (execId, state) => + state match { +case PodDeleted(pod) => + removeExecutorFromSpark(schedulerBackend, pod, execId) + execIdsRemovedInThisRound += execId +case errorOrSucceeded @ (PodFailed(_) | PodSucceeded(_)) => + removeExecutorFromK8s(errorOrSucceeded.pod) + removeExecutorFromSpark(schedulerBackend, errorOrSucceeded.pod, execId) + execIdsRemovedInThisRound += execId +case _ => + } +} + +// Reconcile the case where Spark claims to know about an executor but the corresponding pod +// is missing from the cluster. This would occur if we miss a deletion event and the pod +// transitions immediately from running io absent. +(schedulerBackend.getExecutorIds().map(_.toLong).toSet --- End diff -- Another case where the previous approach was insufficient. If Spark gets an executor connected to it that is not in the current snapshot, then we should probably get rid of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r193934305 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit} + +import com.google.common.collect.Lists +import io.fabric8.kubernetes.api.model.Pod +import io.reactivex.disposables.Disposable +import io.reactivex.functions.Consumer +import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.util.{ThreadUtils, Utils} + +private[spark] class ExecutorPodsSnapshotsStoreImpl( +bufferSnapshotsExecutor: ScheduledExecutorService, +executeSubscriptionsExecutor: ExecutorService) + extends ExecutorPodsSnapshotsStore { + + private val SNAPSHOT_LOCK = new Object() + + private val snapshotsObservable = PublishSubject.create[ExecutorPodsSnapshot]() + private val observedDisposables = mutable.Buffer.empty[Disposable] + + @GuardedBy("SNAPSHOT_LOCK") + private var currentSnapshot = ExecutorPodsSnapshot() + + override def addSubscriber( + processBatchIntervalMillis: Long) + (subscriber: ExecutorPodsSnapshot => Unit): Unit = { +observedDisposables += snapshotsObservable + // Group events in the time window given by the caller. These buffers are then sent + // to the caller's lambda at the given interval, with the pod updates that occurred + // in that given interval. + .buffer( +processBatchIntervalMillis, +TimeUnit.MILLISECONDS, +// For testing - specifically use the given scheduled executor service to trigger +// buffer boundaries. Allows us to inject a deterministic scheduler here. +Schedulers.from(bufferSnapshotsExecutor)) + // Trigger an event cycle immediately. Not strictly required to be fully correct, but + // in particular the pod allocator should try to request executors immediately instead + // of waiting for one pod allocation delay. + .startWith(Lists.newArrayList(ExecutorPodsSnapshot())) + // Force all triggered events - both the initial event above and the buffered ones in + // the following time windows - to execute asynchronously to this call's thread. + .observeOn(Schedulers.from(executeSubscriptionsExecutor)) + .subscribe(toReactivexConsumer { snapshots: java.util.List[ExecutorPodsSnapshot] => +Utils.tryLogNonFatalError { + snapshots.asScala.foreach(subscriber) +} + }) + } + + override def stop(): Unit = { +observedDisposables.foreach(_.dispose()) +snapshotsObservable.onComplete() +ThreadUtils.shutdown(bufferSnapshotsExecutor) +ThreadUtils.shutdown(executeSubscriptionsExecutor) + } + + override def updatePod(updatedPod: Pod): Unit = SNAPSHOT_LOCK.synchronized { +currentSnapshot = currentSnapshot.withUpdate(updatedPod) --- End diff -- So the watch only creates new snapshots by applying some "diff" according to the next event to the previous snapshot. One downside of this data model is that we end up buffering multiple collections of pods in the observable stream which can all perhaps only differ by a single pod per update. Thus we end up temporarily storing redundant information in the snapshots. But the observable buffers are ephemeral and will be periodically processed by the periodic iterations of the subscribers. I wouldn't mind thinking about a more optimal representation here. ---
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91545 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91545/testReport)** for PR 21366 at commit [`a97fc5d`](https://github.com/apache/spark/commit/a97fc5d5b87a8caf14ae923f89a7bc106c48d411). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193933559 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (partition_id, epoch_id)) +... return True +...
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3842/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91544 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91544/testReport)** for PR 21366 at commit [`3b85ab5`](https://github.com/apache/spark/commit/3b85ab52b523dc182227b1ece517765903e64109). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21366 @foxish @liyinan926 @dvogelbacher ok, I think what we have here is closer to what we want. I primarily addressed the situations outlined by the most recent comments and observations made about the previous design. The PR is getting pretty lengthy, so I wouldn't object to a request to break down the patch into smaller chunks if anyone requests that for ease of review and legibility. In [the latest patch](https://github.com/apache/spark/pull/21366/commits/8615c067328c0c64d0d048922b221477580acdb4) we preserve the idea of observables and having multiple modules poll the state of the cluster, but instead of conceiving of state changes as updates to pods, the modules now always receive a full snapshot of the current state of the cluster. More formally, the watch pushes snapshots that are just the previous state with the state of the updated executor applied. The polling, on the contrary, pushes fresh snapshots. The aim is to get as close to a level-based paradigm as possible. I think this gets us closest, but I've left some fringe unlikely edge cases for future patches. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91543 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91543/testReport)** for PR 21366 at commit [`0a205f6`](https://github.com/apache/spark/commit/0a205f6bdcb3ec1e730037508bc393fe13ce9b0c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org