[jira] [Commented] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722028#comment-16722028 ] ASF GitHub Bot commented on SPARK-26362: asfgit closed pull request #23311: [SPARK-26362][CORE] Remove 'spark.driver.allowMultipleContexts' to disallow multiple creation of SparkContexts URL: https://github.com/apache/spark/pull/23311 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 696dafda6d1ec..09cc346db0ed2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -64,9 +64,8 @@ import org.apache.spark.util.logging.DriverLogger * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * - * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before - * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. - * + * @note Only one `SparkContext` should be active per JVM. You must `stop()` the + * active `SparkContext` before creating a new one. * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ @@ -75,14 +74,10 @@ class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() - // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active - private val allowMultipleContexts: Boolean = -config.getBoolean("spark.driver.allowMultipleContexts", false) - // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. // NOTE: this must be placed at the beginning of the SparkContext constructor. - SparkContext.markPartiallyConstructed(this, allowMultipleContexts) + SparkContext.markPartiallyConstructed(this) val startTime = System.currentTimeMillis() @@ -2392,7 +2387,7 @@ class SparkContext(config: SparkConf) extends Logging { // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. - SparkContext.setActiveContext(this, allowMultipleContexts) + SparkContext.setActiveContext(this) } /** @@ -2409,18 +2404,18 @@ object SparkContext extends Logging { private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK. + * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`. */ private val activeContext: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) /** - * Points to a partially-constructed SparkContext if some thread is in the SparkContext + * Points to a partially-constructed SparkContext if another thread is in the SparkContext * constructor, or `None` if no SparkContext is being constructed. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`. */ private var contextBeingConstructed: Option[SparkContext] = None @@ -2428,24 +2423,16 @@ object SparkContext extends Logging { * Called to ensure that no other SparkContext is running in this JVM. * * Throws an exception if a running context is detected and logs a warning if another thread is - * constructing a SparkContext. This warning is necessary because the current locking scheme + * constructing a SparkContext. This warning is necessary because the current locking scheme * prevents us from reliably distinguishing between cases where another context is being * constructed and cases where another constructor threw an exception. */ - private def assertNoOtherContextIsRunning( - sc: SparkContext, - allowMultipleContexts: Boolean): Unit = { + private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
[jira] [Resolved] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26362. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23311 [https://github.com/apache/spark/pull/23311] > Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > --- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Major > Fix For: 3.0.0 > > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26362: Assignee: Hyukjin Kwon > Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > --- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon >Priority: Major > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26265) deadlock between TaskMemoryManager and BytesToBytesMap$MapIterator
[ https://issues.apache.org/jira/browse/SPARK-26265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722025#comment-16722025 ] ASF GitHub Bot commented on SPARK-26265: asfgit closed pull request #23294: [SPARK-26265][Core][Followup] Put freePage into a finally block URL: https://github.com/apache/spark/pull/23294 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index fbba002f1f80f..7df8aafb2b674 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -262,36 +262,39 @@ private void advanceToNextPage() { // reference to the page to free and free it after releasing the lock of `MapIterator`. MemoryBlock pageToFree = null; - synchronized (this) { -int nextIdx = dataPages.indexOf(currentPage) + 1; -if (destructive && currentPage != null) { - dataPages.remove(currentPage); - pageToFree = currentPage; - nextIdx --; -} -if (dataPages.size() > nextIdx) { - currentPage = dataPages.get(nextIdx); - pageBaseObject = currentPage.getBaseObject(); - offsetInPage = currentPage.getBaseOffset(); - recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage); - offsetInPage += UnsafeAlignedOffset.getUaoSize(); -} else { - currentPage = null; - if (reader != null) { -handleFailedDelete(); + try { +synchronized (this) { + int nextIdx = dataPages.indexOf(currentPage) + 1; + if (destructive && currentPage != null) { +dataPages.remove(currentPage); +pageToFree = currentPage; +nextIdx--; } - try { -Closeables.close(reader, /* swallowIOException = */ false); -reader = spillWriters.getFirst().getReader(serializerManager); -recordsInPage = -1; - } catch (IOException e) { -// Scala iterator does not handle exception -Platform.throwException(e); + if (dataPages.size() > nextIdx) { +currentPage = dataPages.get(nextIdx); +pageBaseObject = currentPage.getBaseObject(); +offsetInPage = currentPage.getBaseOffset(); +recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage); +offsetInPage += UnsafeAlignedOffset.getUaoSize(); + } else { +currentPage = null; +if (reader != null) { + handleFailedDelete(); +} +try { + Closeables.close(reader, /* swallowIOException = */ false); + reader = spillWriters.getFirst().getReader(serializerManager); + recordsInPage = -1; +} catch (IOException e) { + // Scala iterator does not handle exception + Platform.throwException(e); +} } } - } - if (pageToFree != null) { -freePage(pageToFree); + } finally { +if (pageToFree != null) { + freePage(pageToFree); +} } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > deadlock between TaskMemoryManager and BytesToBytesMap$MapIterator > -- > > Key: SPARK-26265 > URL: https://issues.apache.org/jira/browse/SPARK-26265 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.2 >Reporter: qian han >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.1, 3.0.0 > > > The application is running on a cluster with 72000 cores and 182000G mem. > Enviroment: > |spark.dynamicAllocation.minExecutors|5| > |spark.dynamicAllocation.initialExecutors|30| > |spark.dynamicAllocation.maxExecutors|400| > |spark.executor.cores|4| > |spark.executor.memory|20g| > > > Stage description: > org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:364) > org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:422) >
[jira] [Commented] (SPARK-26342) Support for NFS mount for Kubernetes
[ https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721997#comment-16721997 ] Yinan Li commented on SPARK-26342: -- Yes, that's true. Feel free to create a PR to add nfs and flex. > Support for NFS mount for Kubernetes > > > Key: SPARK-26342 > URL: https://issues.apache.org/jira/browse/SPARK-26342 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Eric Carlson >Priority: Minor > > Currently only hostPath, emptyDir, and PVC volume types are accepted for > Kubernetes-deployed drivers and executors. Possibility to mount NFS paths > would allow access to a common and easy-to-deploy shared storage solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26342) Support for NFS mount for Kubernetes
[ https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721990#comment-16721990 ] Eric Carlson edited comment on SPARK-26342 at 12/15/18 4:04 AM: (discussion also applies for flexvolume improvement request SPARK-26344) Hi [~liyinan926] - that's right that pod templates can be used for this, and I have that working, but it would be more convenient for users if a separate yaml didn't have to be crafted for some options when so much is already available via the conf mechanisms. I actually already have this code working on a local branch, waiting for corporate contributor approval to publish for review - it doesn't really add a feature, just adds the nfs (and in a separate branch, flexvolume) types to the existing volume config options, which ends up being a fairly small addition to a few switch statements. Would a change like this be up for consideration, or is there a design decision to not add beyond empytdir, pvc, and hostpath? (I couldn't find design docs/discussion) was (Author: ektar): (discussion also applies for flexvolume improvement request) Hi [~liyinan926] - that's right that pod templates can be used for this, and I have that working, but it would be more convenient for users if a separate yaml didn't have to be crafted for some options when so much is already available via the conf mechanisms. I actually already have this code working on a local branch, waiting for corporate contributor approval to publish for review - it doesn't really add a feature, just adds the nfs (and in a separate branch, flexvolume) types to the existing volume config options, which ends up being a fairly small addition to a few switch statements. Would a change like this be up for consideration, or is there a design decision to not add beyond empytdir, pvc, and hostpath? (I couldn't find design docs/discussion) > Support for NFS mount for Kubernetes > > > Key: SPARK-26342 > URL: https://issues.apache.org/jira/browse/SPARK-26342 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Eric Carlson >Priority: Minor > > Currently only hostPath, emptyDir, and PVC volume types are accepted for > Kubernetes-deployed drivers and executors. Possibility to mount NFS paths > would allow access to a common and easy-to-deploy shared storage solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26344) Support for flexVolume mount for Kubernetes
[ https://issues.apache.org/jira/browse/SPARK-26344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721992#comment-16721992 ] Eric Carlson commented on SPARK-26344: -- (Discussion at SPARK-26342 - question on whether any types beyond pvc/emptydir/hostpath would be considered to be added as volume types for the existing volume configuration options) > Support for flexVolume mount for Kubernetes > --- > > Key: SPARK-26344 > URL: https://issues.apache.org/jira/browse/SPARK-26344 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Eric Carlson >Priority: Minor > > Currently only hostPath, emptyDir, and PVC volume types are accepted for > Kubernetes-deployed drivers and executors. > flexVolume types allow for pluggable volume drivers to be used in Kubernetes > - a widely used example of this is the Rook deployment of CephFS, which > provides a POSIX-compliant distributed filesystem integrated into K8s. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26342) Support for NFS mount for Kubernetes
[ https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721990#comment-16721990 ] Eric Carlson commented on SPARK-26342: -- (discussion also applies for flexvolume improvement request) Hi [~liyinan926] - that's right that pod templates can be used for this, and I have that working, but it would be more convenient for users if a separate yaml didn't have to be crafted for some options when so much is already available via the conf mechanisms. I actually already have this code working on a local branch, waiting for corporate contributor approval to publish for review - it doesn't really add a feature, just adds the nfs (and in a separate branch, flexvolume) types to the existing volume config options, which ends up being a fairly small addition to a few switch statements. Would a change like this be up for consideration, or is there a design decision to not add beyond empytdir, pvc, and hostpath? (I couldn't find design docs/discussion) > Support for NFS mount for Kubernetes > > > Key: SPARK-26342 > URL: https://issues.apache.org/jira/browse/SPARK-26342 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Eric Carlson >Priority: Minor > > Currently only hostPath, emptyDir, and PVC volume types are accepted for > Kubernetes-deployed drivers and executors. Possibility to mount NFS paths > would allow access to a common and easy-to-deploy shared storage solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20628) Keep track of nodes which are going to be shut down & avoid scheduling new tasks
[ https://issues.apache.org/jira/browse/SPARK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721909#comment-16721909 ] ASF GitHub Bot commented on SPARK-20628: vanzin closed pull request #19267: [WIP][SPARK-20628][CORE] Blacklist nodes when they transition to DECOMMISSIONING state in YARN URL: https://github.com/apache/spark/pull/19267 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/HostState.scala b/core/src/main/scala/org/apache/spark/HostState.scala new file mode 100644 index 0..17b374c3fac26 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/HostState.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.hadoop.yarn.api.records.NodeState + +private[spark] object HostState extends Enumeration { + + type HostState = Value + + val New, Running, Unhealthy, Decommissioning, Decommissioned, Lost, Rebooted = Value + + def fromYarnState(state: String): Option[HostState] = { +HostState.values.find(_.toString.toUpperCase == state) + } + + def toYarnState(state: HostState): Option[String] = { +NodeState.values.find(_.name == state.toString.toUpperCase).map(_.name) + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9495cd2835f97..84edcff707d44 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -154,6 +154,16 @@ package object config { ConfigBuilder("spark.blacklist.application.fetchFailure.enabled") .booleanConf .createWithDefault(false) + + private[spark] val BLACKLIST_DECOMMISSIONING_ENABLED = +ConfigBuilder("spark.blacklist.decommissioning.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val BLACKLIST_DECOMMISSIONING_TIMEOUT_CONF = +ConfigBuilder("spark.blacklist.decommissioning.timeout") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional // End blacklist confs private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE = diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index cd8e61d6d0208..7bc3db8ce1bb9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -61,7 +61,13 @@ private[scheduler] class BlacklistTracker ( private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC) private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE) val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf) - private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED) + val BLACKLIST_DECOMMISSIONING_TIMEOUT_MILLIS = +BlacklistTracker.getBlacklistDecommissioningTimeout(conf) + private val TASK_BLACKLISTING_ENABLED = BlacklistTracker.isTaskExecutionBlacklistingEnabled(conf) + private val DECOMMISSIONING_BLACKLISTING_ENABLED = +BlacklistTracker.isDecommissioningBlacklistingEnabled(conf) + private val BLACKLIST_FETCH_FAILURE_ENABLED = +BlacklistTracker.isFetchFailureBlacklistingEnabled(conf) /** * A map from executorId to information on task failures. Tracks the time of each task failure, @@ -89,13 +95,13 @@ private[scheduler] class BlacklistTracker ( * successive blacklisted executors on one node. Nonetheless, it will not grow too large because * there cannot be many blacklisted executors on one node, before we stop requesting more * executors on that node, and we clean up the list of blacklisted executors once an executor has - * been blacklisted for
[jira] [Resolved] (SPARK-26290) [K8s] Driver Pods no mounted volumes on submissions from older spark versions
[ https://issues.apache.org/jira/browse/SPARK-26290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yinan Li resolved SPARK-26290. -- Resolution: Not A Bug > [K8s] Driver Pods no mounted volumes on submissions from older spark versions > - > > Key: SPARK-26290 > URL: https://issues.apache.org/jira/browse/SPARK-26290 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.4.0 > Environment: Kuberentes: 1.10.6 > Container: Spark 2.4.0 > Spark containers are built from the archive served by > [www.apache.org/dist/spark/|http://www.apache.org/dist/spark/] > Submission done by older spark versions integrated e.g. in livy >Reporter: Martin Buchleitner >Priority: Major > > I want to use the volume feature to mount an existing PVC as readonly volume > into the driver and also executor. > The executor gets the PVC mounted, but the driver is missing the mount > {code:java} > /opt/spark/bin/spark-submit \ > --deploy-mode cluster \ > --class org.apache.spark.examples.SparkPi \ > --conf spark.app.name=spark-pi \ > --conf spark.executor.instances=4 \ > --conf spark.kubernetes.namespace=spark-demo \ > --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ > --conf spark.kubernetes.container.image.pullPolicy=Always \ > --conf spark.kubernetes.container.image=kube-spark:2.4.0 \ > --conf spark.master=k8s://https:// \ > --conf > spark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.path=/srv \ > --conf > spark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.readOnly=true > \ > --conf > spark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.options.claimName=nfs-pvc > \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/srv \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=true > \ > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=nfs-pvc > \ > /srv/spark-examples_2.11-2.4.0.jar > {code} > When i use the jar included in the container > {code:java} > local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar > {code} > the call works and i can review the pod descriptions to review the behavior > *Driver description* > {code:java} > Name: spark-pi-1544018157391-driver > [...] > Containers: > spark-kubernetes-driver: > Container ID: > docker://3a31d867c140183247cb296e13a8b35d03835f7657dd7e625c59083024e51e28 > Image: kube-spark:2.4.0 > Image ID: [...] > Port: > Host Port: > State: Terminated > Reason: Completed > Exit Code:0 > Started: Wed, 05 Dec 2018 14:55:59 +0100 > Finished: Wed, 05 Dec 2018 14:56:08 +0100 > Ready: False > Restart Count: 0 > Limits: > memory: 1408Mi > Requests: > cpu: 1 > memory: 1Gi > Environment: > SPARK_DRIVER_MEMORY:1g > SPARK_DRIVER_CLASS: org.apache.spark.examples.SparkPi > SPARK_DRIVER_ARGS: > SPARK_DRIVER_BIND_ADDRESS: (v1:status.podIP) > SPARK_MOUNTED_CLASSPATH: > /opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar > SPARK_JAVA_OPT_1: > -Dspark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/srv > SPARK_JAVA_OPT_3: -Dspark.app.name=spark-pi > SPARK_JAVA_OPT_4: > -Dspark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.path=/srv > SPARK_JAVA_OPT_5: -Dspark.submit.deployMode=cluster > SPARK_JAVA_OPT_6: -Dspark.driver.blockManager.port=7079 > SPARK_JAVA_OPT_7: > -Dspark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.readOnly=true > SPARK_JAVA_OPT_8: > -Dspark.kubernetes.authenticate.driver.serviceAccountName=spark > SPARK_JAVA_OPT_9: > -Dspark.driver.host=spark-pi-1544018157391-driver-svc.spark-demo.svc.cluster.local > SPARK_JAVA_OPT_10: > -Dspark.kubernetes.driver.pod.name=spark-pi-1544018157391-driver > SPARK_JAVA_OPT_11: > -Dspark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.options.claimName=nfs-pvc > SPARK_JAVA_OPT_12: > -Dspark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=true > SPARK_JAVA_OPT_13: -Dspark.driver.port=7078 > SPARK_JAVA_OPT_14: > -Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar > SPARK_JAVA_OPT_15: > -Dspark.kubernetes.executor.podNamePrefix=spark-pi-1544018157391 > SPARK_JAVA_OPT_16: -Dspark.local.dir=/tmp/spark-local > SPARK_JAVA_OPT_17: -Dspark.master=k8s://https:// >
[jira] [Commented] (SPARK-26342) Support for NFS mount for Kubernetes
[ https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721894#comment-16721894 ] Yinan Li commented on SPARK-26342: -- So basically what you want is a generic way to mount arbitrary types of volumes. This is covered by SPARK-24434, which enables using a pod template to configure the driver and/or executor pods. > Support for NFS mount for Kubernetes > > > Key: SPARK-26342 > URL: https://issues.apache.org/jira/browse/SPARK-26342 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Eric Carlson >Priority: Minor > > Currently only hostPath, emptyDir, and PVC volume types are accepted for > Kubernetes-deployed drivers and executors. Possibility to mount NFS paths > would allow access to a common and easy-to-deploy shared storage solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26344) Support for flexVolume mount for Kubernetes
[ https://issues.apache.org/jira/browse/SPARK-26344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721893#comment-16721893 ] Yinan Li commented on SPARK-26344: -- This is covered by SPARK-24434, which enables using a pod template to configure the driver and/or executor pods. > Support for flexVolume mount for Kubernetes > --- > > Key: SPARK-26344 > URL: https://issues.apache.org/jira/browse/SPARK-26344 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Eric Carlson >Priority: Minor > > Currently only hostPath, emptyDir, and PVC volume types are accepted for > Kubernetes-deployed drivers and executors. > flexVolume types allow for pluggable volume drivers to be used in Kubernetes > - a widely used example of this is the Rook deployment of CephFS, which > provides a POSIX-compliant distributed filesystem integrated into K8s. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721869#comment-16721869 ] ASF GitHub Bot commented on SPARK-26267: zsxwing opened a new pull request #23324: [SPARK-26267][SS]Retry when detecting incorrect offsets from Kafka URL: https://github.com/apache/spark/pull/23324 ## What changes were proposed in this pull request? Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data. To reduce the impact of KAFKA-7703, this PR will use the previous offsets we get to audit the result from Kafka. If we find any incorrect offset, we will retry at most `maxOffsetFetchAttempts` times. For the first batch of a new query, as we don't have any previous offsets, we simply fetch offsets twice. This should reduce the chance to hit KAFKA-7703 a lot. ## How was this patch tested? Jenkins This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26267: Assignee: Apache Spark (was: Shixiong Zhu) > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Apache Spark >Priority: Blocker > Labels: correctness > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26267: Assignee: Shixiong Zhu (was: Apache Spark) > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26372) CSV parsing uses previous good value for bad input field
[ https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721858#comment-16721858 ] ASF GitHub Bot commented on SPARK-26372: bersprockets opened a new pull request #23323: [SPARK-26372][SQL] Don't reuse value from previous row when parsing bad CSV input field URL: https://github.com/apache/spark/pull/23323 ## What changes were proposed in this pull request? CSV parsing accidentally uses the previous good value for a bad input field. See example in Jira. This PR ensures that the associated column is set to null when an input field cannot be converted. ## How was this patch tested? Added new test. Ran all SQL unit tests (testOnly org.apache.spark.sql.*). Ran pyspark tests for pyspark-sql This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > CSV parsing uses previous good value for bad input field > > > Key: SPARK-26372 > URL: https://issues.apache.org/jira/browse/SPARK-26372 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Priority: Major > > For example: > {noformat} > bash-3.2$ cat test.csv > "hello",1999-08-01 > "there","bad date" > "again","2017-11-22" > bash-3.2$ bin/spark-shell > ..etc.. > scala> import org.apache.spark.sql.types._ > scala> import org.apache.spark.sql.SaveMode > scala> var schema = StructType(StructField("col1", StringType) :: > | StructField("col2", DateType) :: > | Nil) > schema: org.apache.spark.sql.types.StructType = > StructType(StructField(col1,StringType,true), StructField(col2,DateType,true)) > scala> val df = spark.read.schema(schema).csv("test.csv") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: date] > scala> df.show > +-+--+ > > | col1| col2| > +-+--+ > |hello|1999-08-01| > |there|1999-08-01| > |again|2017-11-22| > +-+--+ > scala> > {noformat} > col2 from the second row contains "1999-08-01", when it should contain null. > This is because UnivocityParser reuses the same Row object for each input > record. If there is an exception converting an input field, the code simply > skips over that field, leaving the existing value in the Row object. > The simple fix is to set the column to null in the Row object whenever there > is a badRecordException while converting the input field. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26372) CSV parsing uses previous good value for bad input field
[ https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26372: Assignee: (was: Apache Spark) > CSV parsing uses previous good value for bad input field > > > Key: SPARK-26372 > URL: https://issues.apache.org/jira/browse/SPARK-26372 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Priority: Major > > For example: > {noformat} > bash-3.2$ cat test.csv > "hello",1999-08-01 > "there","bad date" > "again","2017-11-22" > bash-3.2$ bin/spark-shell > ..etc.. > scala> import org.apache.spark.sql.types._ > scala> import org.apache.spark.sql.SaveMode > scala> var schema = StructType(StructField("col1", StringType) :: > | StructField("col2", DateType) :: > | Nil) > schema: org.apache.spark.sql.types.StructType = > StructType(StructField(col1,StringType,true), StructField(col2,DateType,true)) > scala> val df = spark.read.schema(schema).csv("test.csv") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: date] > scala> df.show > +-+--+ > > | col1| col2| > +-+--+ > |hello|1999-08-01| > |there|1999-08-01| > |again|2017-11-22| > +-+--+ > scala> > {noformat} > col2 from the second row contains "1999-08-01", when it should contain null. > This is because UnivocityParser reuses the same Row object for each input > record. If there is an exception converting an input field, the code simply > skips over that field, leaving the existing value in the Row object. > The simple fix is to set the column to null in the Row object whenever there > is a badRecordException while converting the input field. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26372) CSV parsing uses previous good value for bad input field
[ https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26372: Assignee: Apache Spark > CSV parsing uses previous good value for bad input field > > > Key: SPARK-26372 > URL: https://issues.apache.org/jira/browse/SPARK-26372 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Assignee: Apache Spark >Priority: Major > > For example: > {noformat} > bash-3.2$ cat test.csv > "hello",1999-08-01 > "there","bad date" > "again","2017-11-22" > bash-3.2$ bin/spark-shell > ..etc.. > scala> import org.apache.spark.sql.types._ > scala> import org.apache.spark.sql.SaveMode > scala> var schema = StructType(StructField("col1", StringType) :: > | StructField("col2", DateType) :: > | Nil) > schema: org.apache.spark.sql.types.StructType = > StructType(StructField(col1,StringType,true), StructField(col2,DateType,true)) > scala> val df = spark.read.schema(schema).csv("test.csv") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: date] > scala> df.show > +-+--+ > > | col1| col2| > +-+--+ > |hello|1999-08-01| > |there|1999-08-01| > |again|2017-11-22| > +-+--+ > scala> > {noformat} > col2 from the second row contains "1999-08-01", when it should contain null. > This is because UnivocityParser reuses the same Row object for each input > record. If there is an exception converting an input field, the code simply > skips over that field, leaving the existing value in the Row object. > The simple fix is to set the column to null in the Row object whenever there > is a badRecordException while converting the input field. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-20130) Flaky test: BlockManagerProactiveReplicationSuite
[ https://issues.apache.org/jira/browse/SPARK-20130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reopened SPARK-20130: Reopening since I've seen this a few times recently. e.g.: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100161/ {noformat} Error Message org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 2 times over 4.021686279 seconds. Last failure message: 4 did not equal 3. Stacktrace sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 2 times over 4.021686279 seconds. Last failure message: 4 did not equal 3. at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:432) at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:439) at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:391) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:308) at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:307) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) at org.apache.spark.storage.BlockManagerReplicationBehavior.$anonfun$$init$$26(BlockManagerReplicationSuite.scala:299) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) {noformat} > Flaky test: BlockManagerProactiveReplicationSuite > - > > Key: SPARK-20130 > URL: https://issues.apache.org/jira/browse/SPARK-20130 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.2.0 >Reporter: Marcelo Vanzin >Priority: Major > > See following page: > https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.storage.BlockManagerProactiveReplicationSuite_name=proactive+block+replication+-+5+replicas+-+4+block+manager+deletions > I also have seen it fail intermittently during local unit test runs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24942) Improve cluster resource management with jobs containing barrier stage
[ https://issues.apache.org/jira/browse/SPARK-24942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721841#comment-16721841 ] Ilya Matiach commented on SPARK-24942: -- Would really like to see this resolved. It would be great if we could have barrier execution with dynamic allocation enabled. In the case that dynamic allocation is enabled, we should be able to automatically restart the job if resources are for some reason removed and allow the developer to decide whether to restart the job when resources are added (in their own code) to utilize more resources. For the latter case, I think many algorithms that would use something like barrier execution mode are iterative and so they should be able to save the current state and then restart when more resources are allocated. > Improve cluster resource management with jobs containing barrier stage > -- > > Key: SPARK-24942 > URL: https://issues.apache.org/jira/browse/SPARK-24942 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Xingbo Jiang >Priority: Major > > https://github.com/apache/spark/pull/21758#discussion_r205652317 > We shall improve cluster resource management to address the following issues: > - With dynamic resource allocation enabled, it may happen that we acquire > some executors (but not enough to launch all the tasks in a barrier stage) > and later release them due to executor idle time expire, and then acquire > again. > - There can be deadlock with two concurrent applications. Each application > may acquire some resources, but not enough to launch all the tasks in a > barrier stage. And after hitting the idle timeout and releasing them, they > may acquire resources again, but just continually trade resources between > each other. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26373) Spark UI 'environment' tab - column to indicate default vs overridden values
t oo created SPARK-26373: Summary: Spark UI 'environment' tab - column to indicate default vs overridden values Key: SPARK-26373 URL: https://issues.apache.org/jira/browse/SPARK-26373 Project: Spark Issue Type: New Feature Components: Web UI Affects Versions: 2.4.0 Reporter: t oo Rather than just showing name and value for each property, a new column would also show whether the value is default (show 'AS PER DEFAULT') or if its overridden (show the actual default value). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-26267: Assignee: Shixiong Zhu > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26327) Metrics in FileSourceScanExec not update correctly while relation.partitionSchema is set
[ https://issues.apache.org/jira/browse/SPARK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26327: -- Fix Version/s: 2.4.1 2.3.3 2.2.3 > Metrics in FileSourceScanExec not update correctly while > relation.partitionSchema is set > > > Key: SPARK-26327 > URL: https://issues.apache.org/jira/browse/SPARK-26327 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 2.2.3, 2.3.3, 2.4.1, 3.0.0 > > > As currently approach in `FileSourceScanExec`, the metrics of "numFiles" and > "metadataTime"(fileListingTime) were updated while lazy val > `selectedPartitions` initialized in the scenario of relation.partitionSchema > is set. But `selectedPartitions` will be initialized by `metadata` at first, > which is called by `queryExecution.toString` in > `SQLExecution.withNewExecutionId`. So while the > `SQLMetrics.postDriverMetricUpdates` called, there's no corresponding > liveExecutions in SQLAppStatusListener, the metrics update is not work. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26327) Metrics in FileSourceScanExec not update correctly while relation.partitionSchema is set
[ https://issues.apache.org/jira/browse/SPARK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721788#comment-16721788 ] ASF GitHub Bot commented on SPARK-26327: dongjoon-hyun closed pull request #23300: [SPARK-26327][SQL][BACKPORT-2.2] Bug fix for `FileSourceScanExec` metrics update URL: https://github.com/apache/spark/pull/23300 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6fb41b6425c4b..1f6a7c089bf07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -170,19 +170,14 @@ case class FileSourceScanExec( false } + private var metadataTime = 0L + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - -metrics("numFiles").add(ret.map(_.files.size.toLong).sum) -metrics("metadataTime").add(timeTakenMs) - -val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) -SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("metadataTime") :: Nil) - +metadataTime = timeTakenMs ret } @@ -281,6 +276,8 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { +// Update metrics for taking effect in both code generation node and normal node. +updateDriverMetrics() val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -514,6 +511,19 @@ case class FileSourceScanExec( } } + /** + * Send the updated metrics to driver, while this function calling, selectedPartitions has + * been initialized. See SPARK-26327 for more detail. + */ + private def updateDriverMetrics() = { +metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) +metrics("metadataTime").add(metadataTime) + +val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) +SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics("numFiles") :: metrics("metadataTime") :: Nil) + } + override lazy val canonicalized: FileSourceScanExec = { FileSourceScanExec( relation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 79d1fbfa3f072..26b822fe6208f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -372,6 +372,21 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil) } } + + test("SPARK-26327: FileSourceScanExec metrics") { +withTable("testDataForScan") { + spark.range(10).selectExpr("id", "id % 3 as p") +.write.partitionBy("p").saveAsTable("testDataForScan") + // The execution plan only has 1 FileScan node. + val df = spark.sql( +"SELECT * FROM testDataForScan WHERE p = 1") + testSparkPlanMetrics(df, 1, Map( +0L -> (("Scan parquet default.testdataforscan", Map( + "number of output rows" -> 3L, + "number of files" -> 2L + ) +} + } } object InputOutputMetricsHelper { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Metrics in FileSourceScanExec not update correctly while > relation.partitionSchema is set > > > Key: SPARK-26327 > URL: https://issues.apache.org/jira/browse/SPARK-26327 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0
[jira] [Commented] (SPARK-26327) Metrics in FileSourceScanExec not update correctly while relation.partitionSchema is set
[ https://issues.apache.org/jira/browse/SPARK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721785#comment-16721785 ] ASF GitHub Bot commented on SPARK-26327: dongjoon-hyun closed pull request #23299: [SPARK-26327][SQL][BACKPORT-2.3] Bug fix for `FileSourceScanExec` metrics update URL: https://github.com/apache/spark/pull/23299 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 0a23264a4a16b..5543087db0614 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -183,19 +183,14 @@ case class FileSourceScanExec( partitionSchema = relation.partitionSchema, relation.sparkSession.sessionState.conf) + private var metadataTime = 0L + @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - -metrics("numFiles").add(ret.map(_.files.size.toLong).sum) -metrics("metadataTime").add(timeTakenMs) - -val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) -SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("metadataTime") :: Nil) - +metadataTime = timeTakenMs ret } @@ -293,6 +288,8 @@ case class FileSourceScanExec( } private lazy val inputRDD: RDD[InternalRow] = { +// Update metrics for taking effect in both code generation node and normal node. +updateDriverMetrics() val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -500,6 +497,19 @@ case class FileSourceScanExec( } } + /** + * Send the updated metrics to driver, while this function calling, selectedPartitions has + * been initialized. See SPARK-26327 for more detail. + */ + private def updateDriverMetrics() = { +metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum) +metrics("metadataTime").add(metadataTime) + +val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) +SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics("numFiles") :: metrics("metadataTime") :: Nil) + } + override def doCanonicalize(): FileSourceScanExec = { FileSourceScanExec( relation, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a3a3f3851e21c..439a36080ce90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -504,4 +504,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("SPARK-26327: FileSourceScanExec metrics") { +withTable("testDataForScan") { + spark.range(10).selectExpr("id", "id % 3 as p") +.write.partitionBy("p").saveAsTable("testDataForScan") + // The execution plan only has 1 FileScan node. + val df = spark.sql( +"SELECT * FROM testDataForScan WHERE p = 1") + testSparkPlanMetrics(df, 1, Map( +0L -> (("Scan parquet default.testdataforscan", Map( + "number of output rows" -> 3L, + "number of files" -> 2L + ) +} + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Metrics in FileSourceScanExec not update correctly while > relation.partitionSchema is set > > > Key: SPARK-26327 > URL: https://issues.apache.org/jira/browse/SPARK-26327 > Project: Spark >
[jira] [Resolved] (SPARK-21239) Support WAL recover in windows
[ https://issues.apache.org/jira/browse/SPARK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-21239. Resolution: Duplicate As far as my understanding goes the fix for SPARK-25778 should also fix this. > Support WAL recover in windows > -- > > Key: SPARK-21239 > URL: https://issues.apache.org/jira/browse/SPARK-21239 > Project: Spark > Issue Type: Bug > Components: DStreams, Windows >Affects Versions: 1.6.3, 2.1.0, 2.1.1, 2.2.0 >Reporter: Yun Tang >Priority: Major > > When driver failed over, it will read WAL from HDFS by calling > WriteAheadLogBackedBlockRDD.getBlockFromWriteAheadLog(), however, it need a > dummy local path to satisfy the method parameter requirements, but the path > in windows will contain a colon which is not valid for hadoop. I removed the > potential driver letter and colon. > I found one email from spark-user ever talked about this bug > (https://www.mail-archive.com/user@spark.apache.org/msg55030.html) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21239) Support WAL recover in windows
[ https://issues.apache.org/jira/browse/SPARK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721662#comment-16721662 ] ASF GitHub Bot commented on SPARK-21239: vanzin closed pull request #18452: [SPARK-21239][STREAMING] Support WAL recover in windows URL: https://github.com/apache/spark/pull/18452 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 844760ab61d2e..2c97c0afa260e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -135,8 +135,11 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // FileBasedWriteAheadLog will not create any file or directory at that path. Also, // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. +// Specifically, the nonExistentDirectory will contain a colon in windows, this is invalid +// for hadoop. Remove the drive letter and colon, e.g. "D:" out of this path by default val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + System.getProperty("java.io.tmpdir"), + UUID.randomUUID().toString).getAbsolutePath.replaceFirst("[a-zA-Z]:", "") writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support WAL recover in windows > -- > > Key: SPARK-21239 > URL: https://issues.apache.org/jira/browse/SPARK-21239 > Project: Spark > Issue Type: Bug > Components: DStreams, Windows >Affects Versions: 1.6.3, 2.1.0, 2.1.1, 2.2.0 >Reporter: Yun Tang >Priority: Major > > When driver failed over, it will read WAL from HDFS by calling > WriteAheadLogBackedBlockRDD.getBlockFromWriteAheadLog(), however, it need a > dummy local path to satisfy the method parameter requirements, but the path > in windows will contain a colon which is not valid for hadoop. I removed the > potential driver letter and colon. > I found one email from spark-user ever talked about this bug > (https://www.mail-archive.com/user@spark.apache.org/msg55030.html) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25694) URL.setURLStreamHandlerFactory causing incompatible HttpURLConnection issue
[ https://issues.apache.org/jira/browse/SPARK-25694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721656#comment-16721656 ] Howard Wong commented on SPARK-25694: - [~boyangwa], do you know of a workaround for this? I'm facing the same issue. > URL.setURLStreamHandlerFactory causing incompatible HttpURLConnection issue > --- > > Key: SPARK-25694 > URL: https://issues.apache.org/jira/browse/SPARK-25694 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2 >Reporter: Bo Yang >Priority: Minor > > URL.setURLStreamHandlerFactory() in SharedState causes URL.openConnection() > returns FsUrlConnection object, which is not compatible with > HttpURLConnection. This will cause exception when using some third party http > library (e.g. scalaj.http). > The following code in Spark 2.3.0 introduced the issue: > sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala: > {code} > object SharedState extends Logging { ... > URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) ... > } > {code} > Here is the example exception when using scalaj.http in Spark: > {code} > StackTrace: scala.MatchError: > org.apache.hadoop.fs.FsUrlConnection:[http://.example.com|http://.example.com/] > (of class org.apache.hadoop.fs.FsUrlConnection) > at > scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:343) > at scalaj.http.HttpRequest.exec(Http.scala:335) > at scalaj.http.HttpRequest.asString(Http.scala:455) > {code} > > One option to fix the issue is to return null in > URLStreamHandlerFactory.createURLStreamHandler when the protocol is > http/https, so it will use the default behavior and be compatible with > scalaj.http. Following is the code example: > {code} > class SparkUrlStreamHandlerFactory extends URLStreamHandlerFactory with > Logging { > private val fsUrlStreamHandlerFactory = new FsUrlStreamHandlerFactory() > override def createURLStreamHandler(protocol: String): URLStreamHandler = { > val handler = fsUrlStreamHandlerFactory.createURLStreamHandler(protocol) > if (handler == null) { > return null > } > if (protocol != null && > (protocol.equalsIgnoreCase("http") > || protocol.equalsIgnoreCase("https"))) { > // return null to use system default URLStreamHandler > null > } else { > handler > } > } > } > {code} > I would like to get some discussion here before submitting a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26372) CSV parsing uses previous good value for bad input field
[ https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruce Robbins updated SPARK-26372: -- Summary: CSV parsing uses previous good value for bad input field (was: CSV Parsing uses previous good value for bad input field) > CSV parsing uses previous good value for bad input field > > > Key: SPARK-26372 > URL: https://issues.apache.org/jira/browse/SPARK-26372 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Priority: Major > > For example: > {noformat} > bash-3.2$ cat test.csv > "hello",1999-08-01 > "there","bad date" > "again","2017-11-22" > bash-3.2$ bin/spark-shell > ..etc.. > scala> import org.apache.spark.sql.types._ > scala> import org.apache.spark.sql.SaveMode > scala> var schema = StructType(StructField("col1", StringType) :: > | StructField("col2", DateType) :: > | Nil) > schema: org.apache.spark.sql.types.StructType = > StructType(StructField(col1,StringType,true), StructField(col2,DateType,true)) > scala> val df = spark.read.schema(schema).csv("test.csv") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: date] > scala> df.show > +-+--+ > > | col1| col2| > +-+--+ > |hello|1999-08-01| > |there|1999-08-01| > |again|2017-11-22| > +-+--+ > scala> > {noformat} > col2 from the second row contains "1999-08-01", when it should contain null. > This is because UnivocityParser reuses the same Row object for each input > record. If there is an exception converting an input field, the code simply > skips over that field, leaving the existing value in the Row object. > The simple fix is to set the column to null in the Row object whenever there > is a badRecordException while converting the input field. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26372) CSV parsing uses previous good value for bad input field
[ https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721593#comment-16721593 ] Bruce Robbins commented on SPARK-26372: --- I can prep a PR, unless someone thinks this needs a different solution than the one I proposed. > CSV parsing uses previous good value for bad input field > > > Key: SPARK-26372 > URL: https://issues.apache.org/jira/browse/SPARK-26372 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Bruce Robbins >Priority: Major > > For example: > {noformat} > bash-3.2$ cat test.csv > "hello",1999-08-01 > "there","bad date" > "again","2017-11-22" > bash-3.2$ bin/spark-shell > ..etc.. > scala> import org.apache.spark.sql.types._ > scala> import org.apache.spark.sql.SaveMode > scala> var schema = StructType(StructField("col1", StringType) :: > | StructField("col2", DateType) :: > | Nil) > schema: org.apache.spark.sql.types.StructType = > StructType(StructField(col1,StringType,true), StructField(col2,DateType,true)) > scala> val df = spark.read.schema(schema).csv("test.csv") > df: org.apache.spark.sql.DataFrame = [col1: string, col2: date] > scala> df.show > +-+--+ > > | col1| col2| > +-+--+ > |hello|1999-08-01| > |there|1999-08-01| > |again|2017-11-22| > +-+--+ > scala> > {noformat} > col2 from the second row contains "1999-08-01", when it should contain null. > This is because UnivocityParser reuses the same Row object for each input > record. If there is an exception converting an input field, the code simply > skips over that field, leaving the existing value in the Row object. > The simple fix is to set the column to null in the Row object whenever there > is a badRecordException while converting the input field. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26372) CSV Parsing uses previous good value for bad input field
Bruce Robbins created SPARK-26372: - Summary: CSV Parsing uses previous good value for bad input field Key: SPARK-26372 URL: https://issues.apache.org/jira/browse/SPARK-26372 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Bruce Robbins For example: {noformat} bash-3.2$ cat test.csv "hello",1999-08-01 "there","bad date" "again","2017-11-22" bash-3.2$ bin/spark-shell ..etc.. scala> import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.SaveMode scala> var schema = StructType(StructField("col1", StringType) :: | StructField("col2", DateType) :: | Nil) schema: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,DateType,true)) scala> val df = spark.read.schema(schema).csv("test.csv") df: org.apache.spark.sql.DataFrame = [col1: string, col2: date] scala> df.show +-+--+ | col1| col2| +-+--+ |hello|1999-08-01| |there|1999-08-01| |again|2017-11-22| +-+--+ scala> {noformat} col2 from the second row contains "1999-08-01", when it should contain null. This is because UnivocityParser reuses the same Row object for each input record. If there is an exception converting an input field, the code simply skips over that field, leaving the existing value in the Row object. The simple fix is to set the column to null in the Row object whenever there is a badRecordException while converting the input field. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
[ https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26370: --- Assignee: Takuya Ueshin > Fix resolution of higher-order function for the same identifier. > > > Key: SPARK-26370 > URL: https://issues.apache.org/jira/browse/SPARK-26370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takuya Ueshin >Assignee: Takuya Ueshin >Priority: Major > Fix For: 2.4.1, 3.0.0 > > > When using a higher-order function with the same variable name as the > existing columns in {{Filter}} or something which uses > {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,: > {code} > val df = Seq( > (Seq(1, 9, 8, 7), 1, 2), > (Seq(5, 9, 7), 2, 2), > (Seq.empty, 3, 2), > (null, 4, 2) > ).toDF("i", "x", "d") > checkAnswer(df.filter("exists(i, x -> x % d == 0)"), > Seq(Row(Seq(1, 9, 8, 7), 1, 2))) > checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"), > Seq(Row(1))) > {code} > the following exception happens: > {code:java} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to > org.apache.spark.sql.catalyst.expressions.NamedExpression > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215) > ... > {code} > because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly > resolved by the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
[ https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26370. - Resolution: Fixed Fix Version/s: 2.4.1 3.0.0 Issue resolved by pull request 23320 [https://github.com/apache/spark/pull/23320] > Fix resolution of higher-order function for the same identifier. > > > Key: SPARK-26370 > URL: https://issues.apache.org/jira/browse/SPARK-26370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takuya Ueshin >Assignee: Takuya Ueshin >Priority: Major > Fix For: 3.0.0, 2.4.1 > > > When using a higher-order function with the same variable name as the > existing columns in {{Filter}} or something which uses > {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,: > {code} > val df = Seq( > (Seq(1, 9, 8, 7), 1, 2), > (Seq(5, 9, 7), 2, 2), > (Seq.empty, 3, 2), > (null, 4, 2) > ).toDF("i", "x", "d") > checkAnswer(df.filter("exists(i, x -> x % d == 0)"), > Seq(Row(Seq(1, 9, 8, 7), 1, 2))) > checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"), > Seq(Row(1))) > {code} > the following exception happens: > {code:java} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to > org.apache.spark.sql.catalyst.expressions.NamedExpression > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215) > ... > {code} > because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly > resolved by the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
[ https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721575#comment-16721575 ] ASF GitHub Bot commented on SPARK-26370: asfgit closed pull request #23320: [SPARK-26370][SQL] Fix resolution of higher-order function for the same identifier. URL: https://github.com/apache/spark/pull/23320 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala index a8a7bbd9f9cd0..1cd7f412bb678 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala @@ -150,13 +150,14 @@ case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { val lambdaMap = l.arguments.map(v => canonicalizer(v.name) -> v).toMap l.mapChildren(resolve(_, parentLambdaMap ++ lambdaMap)) -case u @ UnresolvedAttribute(name +: nestedFields) => +case u @ UnresolvedNamedLambdaVariable(name +: nestedFields) => parentLambdaMap.get(canonicalizer(name)) match { case Some(lambda) => nestedFields.foldLeft(lambda: Expression) { (expr, fieldName) => ExtractValue(expr, Literal(fieldName), conf.resolver) } -case None => u +case None => + UnresolvedAttribute(u.nameParts) } case _ => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index a8639d29f964d..7141b6e996389 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -22,12 +22,34 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.array.ByteArrayMethods +/** + * A placeholder of lambda variables to prevent unexpected resolution of [[LambdaFunction]]. + */ +case class UnresolvedNamedLambdaVariable(nameParts: Seq[String]) + extends LeafExpression with NamedExpression with Unevaluable { + + override def name: String = +nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".") + + override def exprId: ExprId = throw new UnresolvedException(this, "exprId") + override def dataType: DataType = throw new UnresolvedException(this, "dataType") + override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") + override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") + override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override lazy val resolved = false + + override def toString: String = s"lambda '$name" + + override def sql: String = name +} + /** * A named lambda variable. */ @@ -79,7 +101,7 @@ case class LambdaFunction( object LambdaFunction { val identity: LambdaFunction = { -val id = UnresolvedAttribute.quoted("id") +val id = UnresolvedNamedLambdaVariable(Seq("id")) LambdaFunction(id, Seq(id)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 672bffcfc0cad..8959f78b656d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1338,9 +1338,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitLambda(ctx: LambdaContext): Expression = withOrigin(ctx) { val arguments = ctx.IDENTIFIER().asScala.map { name => - UnresolvedAttribute.quoted(name.getText) +
[jira] [Commented] (SPARK-20890) Add min and max functions for dataset aggregation
[ https://issues.apache.org/jira/browse/SPARK-20890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721387#comment-16721387 ] ASF GitHub Bot commented on SPARK-20890: srowen closed pull request #18113: [SPARK-20890][SQL] Added min and max typed aggregation functions URL: https://github.com/apache/spark/pull/18113 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java index ec9c107b1c119..f426dd95cec27 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java +++ b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java @@ -21,10 +21,7 @@ import org.apache.spark.annotation.InterfaceStability; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.TypedColumn; -import org.apache.spark.sql.execution.aggregate.TypedAverage; -import org.apache.spark.sql.execution.aggregate.TypedCount; -import org.apache.spark.sql.execution.aggregate.TypedSumDouble; -import org.apache.spark.sql.execution.aggregate.TypedSumLong; +import org.apache.spark.sql.execution.aggregate.*; /** * :: Experimental :: @@ -74,4 +71,40 @@ public static TypedColumn sumLong(MapFunction f) { return new TypedSumLong(f).toColumnJava(); } + + /** + * Min aggregate function for floating point (double) type. + * + * @since 2.3.0 + */ + public static TypedColumn min(MapFunction f) { +return new JavaTypedMinDouble(f).toColumn(); + } + + /** + * Min aggregate function for integral (long, i.e. 64 bit integer) type. + * + * @since 2.3.0 + */ + public static TypedColumn minLong(MapFunction f) { +return new JavaTypedMinLong(f).toColumn(); + } + + /** + * Min aggregate function for floating point (double) type. + * + * @since 2.3.0 + */ + public static TypedColumn max(MapFunction f) { +return new JavaTypedMaxDouble(f).toColumn(); + } + + /** + * Min aggregate function for integral (long, i.e. 64 bit integer) type. + * + * @since 2.3.0 + */ + public static TypedColumn maxLong(MapFunction f) { +return new JavaTypedMaxLong(f).toColumn(); + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala index b6550bf3e4aac..1d019e4f78f6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.api.java.function.MapFunction -import org.apache.spark.sql.{Encoder, TypedColumn} +import org.apache.spark.sql.{Encoder, Encoders, TypedColumn} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator @@ -38,13 +38,11 @@ class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Dou // Java api support def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) - def toColumnJava: TypedColumn[IN, java.lang.Double] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } } - class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { override def zero: Long = 0L override def reduce(b: Long, a: IN): Long = b + f(a) @@ -56,13 +54,11 @@ class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { // Java api support def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) - def toColumnJava: TypedColumn[IN, java.lang.Long] = { toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] } } - class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { override def zero: Long = 0 override def reduce(b: Long, a: IN): Long = { @@ -81,14 +77,13 @@ class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] { } } - class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] { override def zero: (Double, Long) = (0.0, 0L) override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2) - override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2 override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = { (b1._1 + b2._1, b1._2 + b2._2) } + override def finish(reduction: (Double, Long)): Double =
[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early
[ https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721386#comment-16721386 ] Julian commented on SPARK-21453: Also the issue is marked as Major on the other thread, but here only Minor? > Cached Kafka consumer may be closed too early > - > > Key: SPARK-21453 > URL: https://issues.apache.org/jira/browse/SPARK-21453 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: Spark 2.2.0 and kafka 0.10.2.0 >Reporter: Pablo Panero >Priority: Minor > > On a streaming job using built-in kafka source and sink (over SSL), with I > am getting the following exception: > Config of the source: > {code:java} > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("failOnDataLoss", value = false) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .option("subscribe", config.topicConfigList.keys.mkString(",")) > .load() > {code} > Config of the sink: > {code:java} > .writeStream > .option("checkpointLocation", > s"${config.checkpointDir}/${topicConfig._1}/") > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .start() > {code} > {code:java} > 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) > at org.apache.kafka.common.network.Selector.close(Selector.java:531) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) > at >
[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early
[ https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721380#comment-16721380 ] Julian commented on SPARK-21453: I see this issue is also raised against Kafka under https://issues.apache.org/jira/browse/KAFKA-5649 > Cached Kafka consumer may be closed too early > - > > Key: SPARK-21453 > URL: https://issues.apache.org/jira/browse/SPARK-21453 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 > Environment: Spark 2.2.0 and kafka 0.10.2.0 >Reporter: Pablo Panero >Priority: Minor > > On a streaming job using built-in kafka source and sink (over SSL), with I > am getting the following exception: > Config of the source: > {code:java} > val df = spark.readStream > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("failOnDataLoss", value = false) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .option("subscribe", config.topicConfigList.keys.mkString(",")) > .load() > {code} > Config of the sink: > {code:java} > .writeStream > .option("checkpointLocation", > s"${config.checkpointDir}/${topicConfig._1}/") > .format("kafka") > .option("kafka.bootstrap.servers", config.bootstrapServers) > .option("kafka.connections.max.idle.ms", 360) > //SSL: this only applies to communication between Spark and Kafka > brokers; you are still responsible for separately securing Spark inter-node > communication. > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.sasl.mechanism", "GSSAPI") > .option("kafka.sasl.kerberos.service.name", "kafka") > .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts") > .option("kafka.ssl.truststore.password", "changeit") > .start() > {code} > {code:java} > 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message > java.io.IOException: Broken pipe > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:65) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:540) > at org.apache.kafka.common.network.Selector.close(Selector.java:531) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) > at > org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106) > at > org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157) > at
[jira] [Commented] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch
[ https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721346#comment-16721346 ] ASF GitHub Bot commented on SPARK-25922: suxingfate opened a new pull request #23322: [SPARK-25922][K8] Spark Driver/Executor spark-app-selector label mism… URL: https://github.com/apache/spark/pull/23322 ## What changes were proposed in this pull request? In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor. This patch consolidated the algorithm for driver and executor to have a universal logic. This will help to monitor resource consumption from K8S perspective. In K8S Client mode, this makes sure it will also use the same algorithm to generate spark-app-selector/spark.app.id for executors. ## How was this patch tested? Manually run." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > [K8] Spark Driver/Executor "spark-app-selector" label mismatch > -- > > Key: SPARK-25922 > URL: https://issues.apache.org/jira/browse/SPARK-25922 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 RC4 >Reporter: Anmol Khurana >Priority: Major > > Hi, > I have been testing Spark 2.4.0 RC4 on Kubernetes to run Python Spark > Applications and running into an issue where the AppId label on the driver > and executors mis-match. I am using the > [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these > applications. > I see a spark.app.id of the form spark-* as "spark-app-selector" label on > the driver as well as in the K8 config-map which gets created for the driver > via spark-submit . My guess is this is coming from > [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211] > > But when the driver actually comes up and brings up executors etc. , I see > that the "spark-app-selector" label on the executors as well as the > spark.app.Id config within the user-code on the driver is something of the > form spark-application-* ( probably from > [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511] > & > [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)] > ) > We were consuming this "spark-app-selector" label on the Driver Pod to get > the App Id and use it to look-up the app in SparkHistory server (among other > use-cases). but due to this mis-match, this logic no longer works. This was > working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is > this expected behavior and if yes, what's the correct way to fetch the > applicationId from outside the application ? > Let me know if I can provide any more details or if I am doing something > wrong. Here is an example run with different *spark-app-selector* label on > the driver/executor : > > {code:java} > Name: pyfiles-driver > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:19:46 -0700 > Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f > spark-role=driver > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Running > Name: pyfiles-1541121585642-exec-1 > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:24:02 -0700 > Labels: spark-app-selector=spark-application-1541121829445 > spark-exec-id=1 > spark-role=executor > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Pending > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch
[ https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721349#comment-16721349 ] Wang, Xinglong commented on SPARK-25922: Hi [~liyinan926], Recently while evaluating spark on k8s, We are setting up tools to monitor resource usage from k8s perspective and also met this problem. I created a pull request for this. Could you please help to review? Thanks, > [K8] Spark Driver/Executor "spark-app-selector" label mismatch > -- > > Key: SPARK-25922 > URL: https://issues.apache.org/jira/browse/SPARK-25922 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 RC4 >Reporter: Anmol Khurana >Priority: Major > > Hi, > I have been testing Spark 2.4.0 RC4 on Kubernetes to run Python Spark > Applications and running into an issue where the AppId label on the driver > and executors mis-match. I am using the > [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these > applications. > I see a spark.app.id of the form spark-* as "spark-app-selector" label on > the driver as well as in the K8 config-map which gets created for the driver > via spark-submit . My guess is this is coming from > [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211] > > But when the driver actually comes up and brings up executors etc. , I see > that the "spark-app-selector" label on the executors as well as the > spark.app.Id config within the user-code on the driver is something of the > form spark-application-* ( probably from > [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511] > & > [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)] > ) > We were consuming this "spark-app-selector" label on the Driver Pod to get > the App Id and use it to look-up the app in SparkHistory server (among other > use-cases). but due to this mis-match, this logic no longer works. This was > working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is > this expected behavior and if yes, what's the correct way to fetch the > applicationId from outside the application ? > Let me know if I can provide any more details or if I am doing something > wrong. Here is an example run with different *spark-app-selector* label on > the driver/executor : > > {code:java} > Name: pyfiles-driver > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:19:46 -0700 > Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f > spark-role=driver > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Running > Name: pyfiles-1541121585642-exec-1 > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:24:02 -0700 > Labels: spark-app-selector=spark-application-1541121829445 > spark-exec-id=1 > spark-role=executor > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Pending > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch
[ https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25922: Assignee: (was: Apache Spark) > [K8] Spark Driver/Executor "spark-app-selector" label mismatch > -- > > Key: SPARK-25922 > URL: https://issues.apache.org/jira/browse/SPARK-25922 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 RC4 >Reporter: Anmol Khurana >Priority: Major > > Hi, > I have been testing Spark 2.4.0 RC4 on Kubernetes to run Python Spark > Applications and running into an issue where the AppId label on the driver > and executors mis-match. I am using the > [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these > applications. > I see a spark.app.id of the form spark-* as "spark-app-selector" label on > the driver as well as in the K8 config-map which gets created for the driver > via spark-submit . My guess is this is coming from > [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211] > > But when the driver actually comes up and brings up executors etc. , I see > that the "spark-app-selector" label on the executors as well as the > spark.app.Id config within the user-code on the driver is something of the > form spark-application-* ( probably from > [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511] > & > [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)] > ) > We were consuming this "spark-app-selector" label on the Driver Pod to get > the App Id and use it to look-up the app in SparkHistory server (among other > use-cases). but due to this mis-match, this logic no longer works. This was > working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is > this expected behavior and if yes, what's the correct way to fetch the > applicationId from outside the application ? > Let me know if I can provide any more details or if I am doing something > wrong. Here is an example run with different *spark-app-selector* label on > the driver/executor : > > {code:java} > Name: pyfiles-driver > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:19:46 -0700 > Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f > spark-role=driver > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Running > Name: pyfiles-1541121585642-exec-1 > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:24:02 -0700 > Labels: spark-app-selector=spark-application-1541121829445 > spark-exec-id=1 > spark-role=executor > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Pending > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch
[ https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25922: Assignee: Apache Spark > [K8] Spark Driver/Executor "spark-app-selector" label mismatch > -- > > Key: SPARK-25922 > URL: https://issues.apache.org/jira/browse/SPARK-25922 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 RC4 >Reporter: Anmol Khurana >Assignee: Apache Spark >Priority: Major > > Hi, > I have been testing Spark 2.4.0 RC4 on Kubernetes to run Python Spark > Applications and running into an issue where the AppId label on the driver > and executors mis-match. I am using the > [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these > applications. > I see a spark.app.id of the form spark-* as "spark-app-selector" label on > the driver as well as in the K8 config-map which gets created for the driver > via spark-submit . My guess is this is coming from > [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211] > > But when the driver actually comes up and brings up executors etc. , I see > that the "spark-app-selector" label on the executors as well as the > spark.app.Id config within the user-code on the driver is something of the > form spark-application-* ( probably from > [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511] > & > [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)] > ) > We were consuming this "spark-app-selector" label on the Driver Pod to get > the App Id and use it to look-up the app in SparkHistory server (among other > use-cases). but due to this mis-match, this logic no longer works. This was > working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is > this expected behavior and if yes, what's the correct way to fetch the > applicationId from outside the application ? > Let me know if I can provide any more details or if I am doing something > wrong. Here is an example run with different *spark-app-selector* label on > the driver/executor : > > {code:java} > Name: pyfiles-driver > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:19:46 -0700 > Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f > spark-role=driver > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Running > Name: pyfiles-1541121585642-exec-1 > Namespace: default > Priority: 0 > PriorityClassName: > Start Time: Thu, 01 Nov 2018 18:24:02 -0700 > Labels: spark-app-selector=spark-application-1541121829445 > spark-exec-id=1 > spark-role=executor > sparkoperator.k8s.io/app-name=pyfiles > sparkoperator.k8s.io/launched-by-spark-operator=true > version=2.4.0 > Status: Pending > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19739) SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of AWS env vars
[ https://issues.apache.org/jira/browse/SPARK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721258#comment-16721258 ] Steve Loughran commented on SPARK-19739: BTW: I'm not backporting HADOOP-14556 to any non-trunk version of Hadoop as its fairly dramatic. If you want to add a patch to all of Hadoop-2.8 to 3.2 to add the temp creds at the start of the list, I'll review and inevitably commit. I don't think there was any deliberate intention not to add it as an option, (unlike the special "no credentials" switch, or the assumed role one). Do read the [S3 test process doc|https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/testing.html#Policy_for_submitting_patches_which_affect_the_hadoop-aws_module.] before submitting though: you're required to run the whole s3a test suite and tell us what endpoint you ran against. Talk to some colleagues to get set up here, if you haven't done it before. > SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of > AWS env vars > -- > > Key: SPARK-19739 > URL: https://issues.apache.org/jira/browse/SPARK-19739 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.2.0 > > > {{SparkHadoopUtil.appendS3AndSparkHadoopConfigurations()}} propagates the AWS > user and secret key to s3n and s3a config options, so getting secrets from > the user to the cluster, if set. > AWS also supports session authentication (env var {{AWS_SESSION_TOKEN}}) and > region endpoints {{AWS_DEFAULT_REGION}}, the latter being critical if you > want to address V4-auth-only endpoints like frankfurt and Seol. > These env vars should be picked up and passed down to S3a too. 4+ lines of > code, though impossible to test unless the existing code is refactored to > take the env var map[String, String], so allowing a test suite to set the > values in itds own map. > side issue: what if only half the env vars are set and users are trying to > understand why auth is failing? It may be good to build up a string > identifying which env vars had their value propagate, and log that @ debug, > while not logging the values, obviously. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart
[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721251#comment-16721251 ] Gabor Somogyi commented on SPARK-26359: --- There is also a possibility (apart from delete the metadata) to rename the file from {quote}s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp {quote} to {quote}s3://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta {quote} As a general warning: manually editing checkpoint directory could end up in data loss and not advised! Use it only if loss can be tolerated. > Spark checkpoint restore fails after query restart > -- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 >Reporter: Kaspar Tint >Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
[jira] [Commented] (SPARK-26371) Increase Kafka ConfigUpdater test coverage
[ https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721240#comment-16721240 ] ASF GitHub Bot commented on SPARK-26371: gaborgsomogyi opened a new pull request #23321: [SPARK-26371][SS][TESTS] Increase kafka ConfigUpdater test coverage. URL: https://github.com/apache/spark/pull/23321 ## What changes were proposed in this pull request? As Kafka delegation token added logic into ConfigUpdater it would be good to test it. This PR contains the following changes: * ConfigUpdater extracted to a separate file and renamed to KafkaConfigUpdater * mockito-core dependency added to kafka-0-10-sql * Unit tests added ## How was this patch tested? Existing + new unit tests + on cluster. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Increase Kafka ConfigUpdater test coverage > -- > > Key: SPARK-26371 > URL: https://issues.apache.org/jira/browse/SPARK-26371 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming, Tests >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Minor > > As Kafka delegation token added logic into ConfigUpdater it would be good to > test it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26371) Increase Kafka ConfigUpdater test coverage
[ https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26371: Assignee: Apache Spark > Increase Kafka ConfigUpdater test coverage > -- > > Key: SPARK-26371 > URL: https://issues.apache.org/jira/browse/SPARK-26371 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming, Tests >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Assignee: Apache Spark >Priority: Minor > > As Kafka delegation token added logic into ConfigUpdater it would be good to > test it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26371) Increase Kafka ConfigUpdater test coverage
[ https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26371: Assignee: (was: Apache Spark) > Increase Kafka ConfigUpdater test coverage > -- > > Key: SPARK-26371 > URL: https://issues.apache.org/jira/browse/SPARK-26371 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming, Tests >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Minor > > As Kafka delegation token added logic into ConfigUpdater it would be good to > test it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart
[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721213#comment-16721213 ] Kaspar Tint commented on SPARK-26359: - Means we changed the configuration *spark.sql.streaming.checkpointLocation: "s3a://some.domain/spark/checkpoints/49"* to *spark.sql.streaming.checkpointLocation: "s3a://some.domain/spark/checkpoints/50"* and restarted the whole Spark application. This obviously results in dataloss and is not a preferred solution. > Spark checkpoint restore fails after query restart > -- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 >Reporter: Kaspar Tint >Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at >
[jira] [Created] (SPARK-26371) Increase Kafka ConfigUpdater test coverage
Gabor Somogyi created SPARK-26371: - Summary: Increase Kafka ConfigUpdater test coverage Key: SPARK-26371 URL: https://issues.apache.org/jira/browse/SPARK-26371 Project: Spark Issue Type: Improvement Components: Structured Streaming, Tests Affects Versions: 3.0.0 Reporter: Gabor Somogyi As Kafka delegation token added logic into ConfigUpdater it would be good to test it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart
[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721195#comment-16721195 ] Gabor Somogyi commented on SPARK-26359: --- Yeah, after the recovery it's advised to switch that back. Related recovery metadata can be deleted. BTW, when you've said {quote}an engineer stepped in and bumped the checkpoint version manually{quote} which file was touched exactly and how? > Spark checkpoint restore fails after query restart > -- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 >Reporter: Kaspar Tint >Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at >
[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart
[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721107#comment-16721107 ] Kaspar Tint commented on SPARK-26359: - This seems to be related to the fact that we decided to try the *spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version* 2 instead of the default 1. Gives a much better performance but looks like this issue here is the tradeoff then? Do you have any suggestions maybe on how to recover from this in a sane way? > Spark checkpoint restore fails after query restart > -- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 >Reporter: Kaspar Tint >Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at >
[jira] [Commented] (SPARK-26353) Add typed aggregate functions:max&
[ https://issues.apache.org/jira/browse/SPARK-26353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721033#comment-16721033 ] ASF GitHub Bot commented on SPARK-26353: 10110346 opened a new pull request #23304: [SPARK-26353][SQL]Add typed aggregate functions: max& URL: https://github.com/apache/spark/pull/23304 ## What changes were proposed in this pull request? For Dataset API, aggregate functions:max& are not implemented in a type-safe way at the moment. This pull request adds min && max aggregate functions in `expressions.scalalang.typed` and `expressions.javalang.typed`package. ## How was this patch tested? Added a unit test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add typed aggregate functions:max& > -- > > Key: SPARK-26353 > URL: https://issues.apache.org/jira/browse/SPARK-26353 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: liuxian >Priority: Minor > > For Dataset API, aggregate functions:max& are not implemented in a > type-safe way at the moment. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org