[jira] [Commented] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722028#comment-16722028
 ] 

ASF GitHub Bot commented on SPARK-26362:


asfgit closed pull request #23311: [SPARK-26362][CORE] Remove 
'spark.driver.allowMultipleContexts' to disallow multiple creation of 
SparkContexts
URL: https://github.com/apache/spark/pull/23311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 696dafda6d1ec..09cc346db0ed2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -64,9 +64,8 @@ import org.apache.spark.util.logging.DriverLogger
  * Main entry point for Spark functionality. A SparkContext represents the 
connection to a Spark
  * cluster, and can be used to create RDDs, accumulators and broadcast 
variables on that cluster.
  *
- * Only one SparkContext may be active per JVM.  You must `stop()` the active 
SparkContext before
- * creating a new one.  This limitation may eventually be removed; see 
SPARK-2243 for more details.
- *
+ * @note Only one `SparkContext` should be active per JVM. You must `stop()` 
the
+ *   active `SparkContext` before creating a new one.
  * @param config a Spark Config object describing the application 
configuration. Any settings in
  *   this config overrides the default configs as well as system properties.
  */
@@ -75,14 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
   // The call site where this SparkContext was constructed.
   private val creationSite: CallSite = Utils.getCallSite()
 
-  // If true, log warnings instead of throwing exceptions when multiple 
SparkContexts are active
-  private val allowMultipleContexts: Boolean =
-config.getBoolean("spark.driver.allowMultipleContexts", false)
-
   // In order to prevent multiple SparkContexts from being active at the same 
time, mark this
   // context as having started construction.
   // NOTE: this must be placed at the beginning of the SparkContext 
constructor.
-  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
+  SparkContext.markPartiallyConstructed(this)
 
   val startTime = System.currentTimeMillis()
 
@@ -2392,7 +2387,7 @@ class SparkContext(config: SparkConf) extends Logging {
   // In order to prevent multiple SparkContexts from being active at the same 
time, mark this
   // context as having finished construction.
   // NOTE: this must be placed at the end of the SparkContext constructor.
-  SparkContext.setActiveContext(this, allowMultipleContexts)
+  SparkContext.setActiveContext(this)
 }
 
 /**
@@ -2409,18 +2404,18 @@ object SparkContext extends Logging {
   private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
 
   /**
-   * The active, fully-constructed SparkContext.  If no SparkContext is 
active, then this is `null`.
+   * The active, fully-constructed SparkContext. If no SparkContext is active, 
then this is `null`.
*
-   * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
+   * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
*/
   private val activeContext: AtomicReference[SparkContext] =
 new AtomicReference[SparkContext](null)
 
   /**
-   * Points to a partially-constructed SparkContext if some thread is in the 
SparkContext
+   * Points to a partially-constructed SparkContext if another thread is in 
the SparkContext
* constructor, or `None` if no SparkContext is being constructed.
*
-   * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
+   * Access to this field is guarded by `SPARK_CONTEXT_CONSTRUCTOR_LOCK`.
*/
   private var contextBeingConstructed: Option[SparkContext] = None
 
@@ -2428,24 +2423,16 @@ object SparkContext extends Logging {
* Called to ensure that no other SparkContext is running in this JVM.
*
* Throws an exception if a running context is detected and logs a warning 
if another thread is
-   * constructing a SparkContext.  This warning is necessary because the 
current locking scheme
+   * constructing a SparkContext. This warning is necessary because the 
current locking scheme
* prevents us from reliably distinguishing between cases where another 
context is being
* constructed and cases where another constructor threw an exception.
*/
-  private def assertNoOtherContextIsRunning(
-  sc: SparkContext,
-  allowMultipleContexts: Boolean): Unit = {
+  private def assertNoOtherContextIsRunning(sc: SparkContext): Unit = {
 SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
   

[jira] [Resolved] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts

2018-12-14 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-26362.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 23311
[https://github.com/apache/spark/pull/23311]

> Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark 
> contexts
> ---
>
> Key: SPARK-26362
> URL: https://issues.apache.org/jira/browse/SPARK-26362
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
> Fix For: 3.0.0
>
>
> Multiple Spark contexts are discouraged and it has been warning from 4 years 
> ago (see SPARK-4180).
> It could cause arbitrary and mysterious error cases. (Honestly, I didn't even 
> know Spark allows it). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts

2018-12-14 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-26362:


Assignee: Hyukjin Kwon

> Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark 
> contexts
> ---
>
> Key: SPARK-26362
> URL: https://issues.apache.org/jira/browse/SPARK-26362
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Hyukjin Kwon
>Assignee: Hyukjin Kwon
>Priority: Major
>
> Multiple Spark contexts are discouraged and it has been warning from 4 years 
> ago (see SPARK-4180).
> It could cause arbitrary and mysterious error cases. (Honestly, I didn't even 
> know Spark allows it). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26265) deadlock between TaskMemoryManager and BytesToBytesMap$MapIterator

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722025#comment-16722025
 ] 

ASF GitHub Bot commented on SPARK-26265:


asfgit closed pull request #23294: [SPARK-26265][Core][Followup] Put freePage 
into a finally block
URL: https://github.com/apache/spark/pull/23294
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index fbba002f1f80f..7df8aafb2b674 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -262,36 +262,39 @@ private void advanceToNextPage() {
   // reference to the page to free and free it after releasing the lock of 
`MapIterator`.
   MemoryBlock pageToFree = null;
 
-  synchronized (this) {
-int nextIdx = dataPages.indexOf(currentPage) + 1;
-if (destructive && currentPage != null) {
-  dataPages.remove(currentPage);
-  pageToFree = currentPage;
-  nextIdx --;
-}
-if (dataPages.size() > nextIdx) {
-  currentPage = dataPages.get(nextIdx);
-  pageBaseObject = currentPage.getBaseObject();
-  offsetInPage = currentPage.getBaseOffset();
-  recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, 
offsetInPage);
-  offsetInPage += UnsafeAlignedOffset.getUaoSize();
-} else {
-  currentPage = null;
-  if (reader != null) {
-handleFailedDelete();
+  try {
+synchronized (this) {
+  int nextIdx = dataPages.indexOf(currentPage) + 1;
+  if (destructive && currentPage != null) {
+dataPages.remove(currentPage);
+pageToFree = currentPage;
+nextIdx--;
   }
-  try {
-Closeables.close(reader, /* swallowIOException = */ false);
-reader = spillWriters.getFirst().getReader(serializerManager);
-recordsInPage = -1;
-  } catch (IOException e) {
-// Scala iterator does not handle exception
-Platform.throwException(e);
+  if (dataPages.size() > nextIdx) {
+currentPage = dataPages.get(nextIdx);
+pageBaseObject = currentPage.getBaseObject();
+offsetInPage = currentPage.getBaseOffset();
+recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, 
offsetInPage);
+offsetInPage += UnsafeAlignedOffset.getUaoSize();
+  } else {
+currentPage = null;
+if (reader != null) {
+  handleFailedDelete();
+}
+try {
+  Closeables.close(reader, /* swallowIOException = */ false);
+  reader = spillWriters.getFirst().getReader(serializerManager);
+  recordsInPage = -1;
+} catch (IOException e) {
+  // Scala iterator does not handle exception
+  Platform.throwException(e);
+}
   }
 }
-  }
-  if (pageToFree != null) {
-freePage(pageToFree);
+  } finally {
+if (pageToFree != null) {
+  freePage(pageToFree);
+}
   }
 }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> deadlock between TaskMemoryManager and BytesToBytesMap$MapIterator
> --
>
> Key: SPARK-26265
> URL: https://issues.apache.org/jira/browse/SPARK-26265
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: qian han
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
>
> The application is running on a cluster with 72000 cores and 182000G mem.
> Enviroment:
> |spark.dynamicAllocation.minExecutors|5|
> |spark.dynamicAllocation.initialExecutors|30|
> |spark.dynamicAllocation.maxExecutors|400|
> |spark.executor.cores|4|
> |spark.executor.memory|20g|
>  
>   
> Stage description:
> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:364)
>  org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:422) 
> 

[jira] [Commented] (SPARK-26342) Support for NFS mount for Kubernetes

2018-12-14 Thread Yinan Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721997#comment-16721997
 ] 

Yinan Li commented on SPARK-26342:
--

Yes, that's true. Feel free to create a PR to add nfs and flex.

> Support for NFS mount for Kubernetes
> 
>
> Key: SPARK-26342
> URL: https://issues.apache.org/jira/browse/SPARK-26342
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Eric Carlson
>Priority: Minor
>
> Currently only hostPath, emptyDir, and PVC volume types are accepted for 
> Kubernetes-deployed drivers and executors.  Possibility to mount NFS paths 
> would allow access to a common and easy-to-deploy shared storage solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-26342) Support for NFS mount for Kubernetes

2018-12-14 Thread Eric Carlson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721990#comment-16721990
 ] 

Eric Carlson edited comment on SPARK-26342 at 12/15/18 4:04 AM:


(discussion also applies for flexvolume improvement request SPARK-26344)

Hi [~liyinan926] - that's right that pod templates can be used for this, and I 
have that working, but it would be more convenient for users if a separate yaml 
didn't have to be crafted for some options when so much is already available 
via the conf mechanisms. 

I actually already have this code working on a local branch, waiting for 
corporate contributor approval to publish for review - it doesn't really add a 
feature, just adds the nfs (and in a separate branch, flexvolume) types to the 
existing volume config options, which ends up being a fairly small addition to 
a few switch statements. 

Would a change like this be up for consideration, or is there a design decision 
to not add beyond empytdir, pvc, and hostpath?  (I couldn't find design 
docs/discussion)


was (Author: ektar):
(discussion also applies for flexvolume improvement request)

Hi [~liyinan926] - that's right that pod templates can be used for this, and I 
have that working, but it would be more convenient for users if a separate yaml 
didn't have to be crafted for some options when so much is already available 
via the conf mechanisms. 

I actually already have this code working on a local branch, waiting for 
corporate contributor approval to publish for review - it doesn't really add a 
feature, just adds the nfs (and in a separate branch, flexvolume) types to the 
existing volume config options, which ends up being a fairly small addition to 
a few switch statements. 

Would a change like this be up for consideration, or is there a design decision 
to not add beyond empytdir, pvc, and hostpath?  (I couldn't find design 
docs/discussion)

> Support for NFS mount for Kubernetes
> 
>
> Key: SPARK-26342
> URL: https://issues.apache.org/jira/browse/SPARK-26342
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Eric Carlson
>Priority: Minor
>
> Currently only hostPath, emptyDir, and PVC volume types are accepted for 
> Kubernetes-deployed drivers and executors.  Possibility to mount NFS paths 
> would allow access to a common and easy-to-deploy shared storage solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26344) Support for flexVolume mount for Kubernetes

2018-12-14 Thread Eric Carlson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721992#comment-16721992
 ] 

Eric Carlson commented on SPARK-26344:
--

(Discussion at SPARK-26342 - question on whether any types beyond 
pvc/emptydir/hostpath would be considered to be added as volume types for the 
existing volume configuration options)

> Support for flexVolume mount for Kubernetes
> ---
>
> Key: SPARK-26344
> URL: https://issues.apache.org/jira/browse/SPARK-26344
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Eric Carlson
>Priority: Minor
>
> Currently only hostPath, emptyDir, and PVC volume types are accepted for 
> Kubernetes-deployed drivers and executors.
> flexVolume types allow for pluggable volume drivers to be used in Kubernetes 
> - a widely used example of this is the Rook deployment of CephFS, which 
> provides a POSIX-compliant distributed filesystem integrated into K8s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26342) Support for NFS mount for Kubernetes

2018-12-14 Thread Eric Carlson (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721990#comment-16721990
 ] 

Eric Carlson commented on SPARK-26342:
--

(discussion also applies for flexvolume improvement request)

Hi [~liyinan926] - that's right that pod templates can be used for this, and I 
have that working, but it would be more convenient for users if a separate yaml 
didn't have to be crafted for some options when so much is already available 
via the conf mechanisms. 

I actually already have this code working on a local branch, waiting for 
corporate contributor approval to publish for review - it doesn't really add a 
feature, just adds the nfs (and in a separate branch, flexvolume) types to the 
existing volume config options, which ends up being a fairly small addition to 
a few switch statements. 

Would a change like this be up for consideration, or is there a design decision 
to not add beyond empytdir, pvc, and hostpath?  (I couldn't find design 
docs/discussion)

> Support for NFS mount for Kubernetes
> 
>
> Key: SPARK-26342
> URL: https://issues.apache.org/jira/browse/SPARK-26342
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Eric Carlson
>Priority: Minor
>
> Currently only hostPath, emptyDir, and PVC volume types are accepted for 
> Kubernetes-deployed drivers and executors.  Possibility to mount NFS paths 
> would allow access to a common and easy-to-deploy shared storage solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20628) Keep track of nodes which are going to be shut down & avoid scheduling new tasks

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721909#comment-16721909
 ] 

ASF GitHub Bot commented on SPARK-20628:


vanzin closed pull request #19267: [WIP][SPARK-20628][CORE] Blacklist nodes 
when they transition to DECOMMISSIONING state in YARN
URL: https://github.com/apache/spark/pull/19267
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/HostState.scala 
b/core/src/main/scala/org/apache/spark/HostState.scala
new file mode 100644
index 0..17b374c3fac26
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/HostState.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.hadoop.yarn.api.records.NodeState
+
+private[spark] object HostState extends Enumeration {
+
+  type HostState = Value
+
+  val New, Running, Unhealthy, Decommissioning, Decommissioned, Lost, Rebooted 
= Value
+
+  def fromYarnState(state: String): Option[HostState] = {
+HostState.values.find(_.toString.toUpperCase == state)
+  }
+
+  def toYarnState(state: HostState): Option[String] = {
+NodeState.values.find(_.name == state.toString.toUpperCase).map(_.name)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 9495cd2835f97..84edcff707d44 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -154,6 +154,16 @@ package object config {
 ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val BLACKLIST_DECOMMISSIONING_ENABLED =
+ConfigBuilder("spark.blacklist.decommissioning.enabled")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val BLACKLIST_DECOMMISSIONING_TIMEOUT_CONF =
+ConfigBuilder("spark.blacklist.decommissioning.timeout")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createOptional
   // End blacklist confs
 
   private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index cd8e61d6d0208..7bc3db8ce1bb9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -61,7 +61,13 @@ private[scheduler] class BlacklistTracker (
   private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
   private val MAX_FAILED_EXEC_PER_NODE = 
conf.get(config.MAX_FAILED_EXEC_PER_NODE)
   val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
-  private val BLACKLIST_FETCH_FAILURE_ENABLED = 
conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)
+  val BLACKLIST_DECOMMISSIONING_TIMEOUT_MILLIS =
+BlacklistTracker.getBlacklistDecommissioningTimeout(conf)
+  private val TASK_BLACKLISTING_ENABLED = 
BlacklistTracker.isTaskExecutionBlacklistingEnabled(conf)
+  private val DECOMMISSIONING_BLACKLISTING_ENABLED =
+BlacklistTracker.isDecommissioningBlacklistingEnabled(conf)
+  private val BLACKLIST_FETCH_FAILURE_ENABLED =
+BlacklistTracker.isFetchFailureBlacklistingEnabled(conf)
 
   /**
* A map from executorId to information on task failures.  Tracks the time 
of each task failure,
@@ -89,13 +95,13 @@ private[scheduler] class BlacklistTracker (
* successive blacklisted executors on one node.  Nonetheless, it will not 
grow too large because
* there cannot be many blacklisted executors on one node, before we stop 
requesting more
* executors on that node, and we clean up the list of blacklisted executors 
once an executor has
-   * been blacklisted for 

[jira] [Resolved] (SPARK-26290) [K8s] Driver Pods no mounted volumes on submissions from older spark versions

2018-12-14 Thread Yinan Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yinan Li resolved SPARK-26290.
--
Resolution: Not A Bug

> [K8s] Driver Pods no mounted volumes on submissions from older spark versions
> -
>
> Key: SPARK-26290
> URL: https://issues.apache.org/jira/browse/SPARK-26290
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
> Environment: Kuberentes: 1.10.6
> Container: Spark 2.4.0 
> Spark containers are built from the archive served by 
> [www.apache.org/dist/spark/|http://www.apache.org/dist/spark/] 
> Submission done by older spark versions integrated e.g. in livy
>Reporter: Martin Buchleitner
>Priority: Major
>
> I want to use the volume feature to mount an existing PVC as readonly volume 
> into the driver and also executor. 
> The executor gets the PVC mounted, but the driver is missing the mount 
> {code:java}
> /opt/spark/bin/spark-submit \
> --deploy-mode cluster \
> --class org.apache.spark.examples.SparkPi \
> --conf spark.app.name=spark-pi \
> --conf spark.executor.instances=4 \
> --conf spark.kubernetes.namespace=spark-demo \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.kubernetes.container.image=kube-spark:2.4.0 \
> --conf spark.master=k8s://https:// \
> --conf 
> spark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.path=/srv \
> --conf 
> spark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.readOnly=true
>  \
> --conf 
> spark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.options.claimName=nfs-pvc
>  \
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/srv \
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=true
>  \
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=nfs-pvc
>  \
> /srv/spark-examples_2.11-2.4.0.jar
> {code}
> When i use the jar included in the container
> {code:java}
> local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar
> {code}
> the call works and i can review the pod descriptions to review the behavior
> *Driver description*
> {code:java}
> Name: spark-pi-1544018157391-driver
> [...]
> Containers:
>   spark-kubernetes-driver:
> Container ID:   
> docker://3a31d867c140183247cb296e13a8b35d03835f7657dd7e625c59083024e51e28
> Image:  kube-spark:2.4.0
> Image ID:   [...]
> Port:   
> Host Port:  
> State:  Terminated
>   Reason:   Completed
>   Exit Code:0
>   Started:  Wed, 05 Dec 2018 14:55:59 +0100
>   Finished: Wed, 05 Dec 2018 14:56:08 +0100
> Ready:  False
> Restart Count:  0
> Limits:
>   memory:  1408Mi
> Requests:
>   cpu: 1
>   memory:  1Gi
> Environment:
>   SPARK_DRIVER_MEMORY:1g
>   SPARK_DRIVER_CLASS: org.apache.spark.examples.SparkPi
>   SPARK_DRIVER_ARGS:
>   SPARK_DRIVER_BIND_ADDRESS:   (v1:status.podIP)
>   SPARK_MOUNTED_CLASSPATH:
> /opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar
>   SPARK_JAVA_OPT_1:   
> -Dspark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/srv
>   SPARK_JAVA_OPT_3:   -Dspark.app.name=spark-pi
>   SPARK_JAVA_OPT_4:   
> -Dspark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.path=/srv
>   SPARK_JAVA_OPT_5:   -Dspark.submit.deployMode=cluster
>   SPARK_JAVA_OPT_6:   -Dspark.driver.blockManager.port=7079
>   SPARK_JAVA_OPT_7:   
> -Dspark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.mount.readOnly=true
>   SPARK_JAVA_OPT_8:   
> -Dspark.kubernetes.authenticate.driver.serviceAccountName=spark
>   SPARK_JAVA_OPT_9:   
> -Dspark.driver.host=spark-pi-1544018157391-driver-svc.spark-demo.svc.cluster.local
>   SPARK_JAVA_OPT_10:  
> -Dspark.kubernetes.driver.pod.name=spark-pi-1544018157391-driver
>   SPARK_JAVA_OPT_11:  
> -Dspark.kubernetes.driver.volumes.persistentVolumeClaim.ddata.options.claimName=nfs-pvc
>   SPARK_JAVA_OPT_12:  
> -Dspark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=true
>   SPARK_JAVA_OPT_13:  -Dspark.driver.port=7078
>   SPARK_JAVA_OPT_14:  
> -Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar
>   SPARK_JAVA_OPT_15:  
> -Dspark.kubernetes.executor.podNamePrefix=spark-pi-1544018157391
>   SPARK_JAVA_OPT_16:  -Dspark.local.dir=/tmp/spark-local
>   SPARK_JAVA_OPT_17:  -Dspark.master=k8s://https://
>   

[jira] [Commented] (SPARK-26342) Support for NFS mount for Kubernetes

2018-12-14 Thread Yinan Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721894#comment-16721894
 ] 

Yinan Li commented on SPARK-26342:
--

So basically what you want is a generic way to mount arbitrary types of 
volumes. This is covered by SPARK-24434, which enables using a pod template to 
configure the driver and/or executor pods.

> Support for NFS mount for Kubernetes
> 
>
> Key: SPARK-26342
> URL: https://issues.apache.org/jira/browse/SPARK-26342
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Eric Carlson
>Priority: Minor
>
> Currently only hostPath, emptyDir, and PVC volume types are accepted for 
> Kubernetes-deployed drivers and executors.  Possibility to mount NFS paths 
> would allow access to a common and easy-to-deploy shared storage solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26344) Support for flexVolume mount for Kubernetes

2018-12-14 Thread Yinan Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721893#comment-16721893
 ] 

Yinan Li commented on SPARK-26344:
--

This is covered by SPARK-24434, which enables using a pod template to configure 
the driver and/or executor pods.

> Support for flexVolume mount for Kubernetes
> ---
>
> Key: SPARK-26344
> URL: https://issues.apache.org/jira/browse/SPARK-26344
> Project: Spark
>  Issue Type: Improvement
>  Components: Kubernetes
>Affects Versions: 2.4.0
>Reporter: Eric Carlson
>Priority: Minor
>
> Currently only hostPath, emptyDir, and PVC volume types are accepted for 
> Kubernetes-deployed drivers and executors.
> flexVolume types allow for pluggable volume drivers to be used in Kubernetes 
> - a widely used example of this is the Rook deployment of CephFS, which 
> provides a POSIX-compliant distributed filesystem integrated into K8s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26267) Kafka source may reprocess data

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721869#comment-16721869
 ] 

ASF GitHub Bot commented on SPARK-26267:


zsxwing opened a new pull request #23324: [SPARK-26267][SS]Retry when detecting 
incorrect offsets from Kafka
URL: https://github.com/apache/spark/pull/23324
 
 
   ## What changes were proposed in this pull request?
   
   Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka 
may return an earliest offset when we are request a latest offset. This will 
cause Spark to reprocess data.
   
   To reduce the impact of KAFKA-7703, this PR will use the previous offsets we 
get to audit the result from Kafka. If we find any incorrect offset, we will 
retry at most `maxOffsetFetchAttempts` times. For the first batch of a new 
query, as we don't have any previous offsets, we simply fetch offsets twice. 
This should reduce the chance to hit KAFKA-7703 a lot.
   
   ## How was this patch tested?
   
   Jenkins


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>  Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26267) Kafka source may reprocess data

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26267:


Assignee: Apache Spark  (was: Shixiong Zhu)

> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Apache Spark
>Priority: Blocker
>  Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26267) Kafka source may reprocess data

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26267:


Assignee: Shixiong Zhu  (was: Apache Spark)

> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>  Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26372) CSV parsing uses previous good value for bad input field

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721858#comment-16721858
 ] 

ASF GitHub Bot commented on SPARK-26372:


bersprockets opened a new pull request #23323: [SPARK-26372][SQL] Don't reuse 
value from previous row when parsing bad CSV input field
URL: https://github.com/apache/spark/pull/23323
 
 
   ## What changes were proposed in this pull request?
   
   CSV parsing accidentally uses the previous good value for a bad input field. 
See example in Jira.
   
   This PR ensures that the associated column is set to null when an input 
field cannot be converted.
   
   ## How was this patch tested?
   
   Added new test.
   Ran all SQL unit tests (testOnly org.apache.spark.sql.*).
   Ran pyspark tests for pyspark-sql
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CSV parsing uses previous good value for bad input field
> 
>
> Key: SPARK-26372
> URL: https://issues.apache.org/jira/browse/SPARK-26372
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> For example:
> {noformat}
> bash-3.2$ cat test.csv 
> "hello",1999-08-01
> "there","bad date"
> "again","2017-11-22"
> bash-3.2$ bin/spark-shell
> ..etc..
> scala> import org.apache.spark.sql.types._
> scala> import org.apache.spark.sql.SaveMode
> scala> var schema = StructType(StructField("col1", StringType) ::
>  |   StructField("col2", DateType) ::
>  |   Nil)
> schema: org.apache.spark.sql.types.StructType = 
> StructType(StructField(col1,StringType,true), StructField(col2,DateType,true))
> scala> val df = spark.read.schema(schema).csv("test.csv")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: date]
> scala> df.show
> +-+--+
>   
> | col1|  col2|
> +-+--+
> |hello|1999-08-01|
> |there|1999-08-01|
> |again|2017-11-22|
> +-+--+
> scala> 
> {noformat}
> col2 from the second row contains "1999-08-01", when it should contain null.
> This is because UnivocityParser reuses the same Row object for each input 
> record. If there is an exception converting an input field, the code simply 
> skips over that field, leaving the existing value in the Row object.
> The simple fix is to set the column to null in the Row object whenever there 
> is a badRecordException while converting the input field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26372) CSV parsing uses previous good value for bad input field

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26372:


Assignee: (was: Apache Spark)

> CSV parsing uses previous good value for bad input field
> 
>
> Key: SPARK-26372
> URL: https://issues.apache.org/jira/browse/SPARK-26372
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> For example:
> {noformat}
> bash-3.2$ cat test.csv 
> "hello",1999-08-01
> "there","bad date"
> "again","2017-11-22"
> bash-3.2$ bin/spark-shell
> ..etc..
> scala> import org.apache.spark.sql.types._
> scala> import org.apache.spark.sql.SaveMode
> scala> var schema = StructType(StructField("col1", StringType) ::
>  |   StructField("col2", DateType) ::
>  |   Nil)
> schema: org.apache.spark.sql.types.StructType = 
> StructType(StructField(col1,StringType,true), StructField(col2,DateType,true))
> scala> val df = spark.read.schema(schema).csv("test.csv")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: date]
> scala> df.show
> +-+--+
>   
> | col1|  col2|
> +-+--+
> |hello|1999-08-01|
> |there|1999-08-01|
> |again|2017-11-22|
> +-+--+
> scala> 
> {noformat}
> col2 from the second row contains "1999-08-01", when it should contain null.
> This is because UnivocityParser reuses the same Row object for each input 
> record. If there is an exception converting an input field, the code simply 
> skips over that field, leaving the existing value in the Row object.
> The simple fix is to set the column to null in the Row object whenever there 
> is a badRecordException while converting the input field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26372) CSV parsing uses previous good value for bad input field

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26372:


Assignee: Apache Spark

> CSV parsing uses previous good value for bad input field
> 
>
> Key: SPARK-26372
> URL: https://issues.apache.org/jira/browse/SPARK-26372
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Assignee: Apache Spark
>Priority: Major
>
> For example:
> {noformat}
> bash-3.2$ cat test.csv 
> "hello",1999-08-01
> "there","bad date"
> "again","2017-11-22"
> bash-3.2$ bin/spark-shell
> ..etc..
> scala> import org.apache.spark.sql.types._
> scala> import org.apache.spark.sql.SaveMode
> scala> var schema = StructType(StructField("col1", StringType) ::
>  |   StructField("col2", DateType) ::
>  |   Nil)
> schema: org.apache.spark.sql.types.StructType = 
> StructType(StructField(col1,StringType,true), StructField(col2,DateType,true))
> scala> val df = spark.read.schema(schema).csv("test.csv")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: date]
> scala> df.show
> +-+--+
>   
> | col1|  col2|
> +-+--+
> |hello|1999-08-01|
> |there|1999-08-01|
> |again|2017-11-22|
> +-+--+
> scala> 
> {noformat}
> col2 from the second row contains "1999-08-01", when it should contain null.
> This is because UnivocityParser reuses the same Row object for each input 
> record. If there is an exception converting an input field, the code simply 
> skips over that field, leaving the existing value in the Row object.
> The simple fix is to set the column to null in the Row object whenever there 
> is a badRecordException while converting the input field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-20130) Flaky test: BlockManagerProactiveReplicationSuite

2018-12-14 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-20130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin reopened SPARK-20130:


Reopening since I've seen this a few times recently. e.g.:

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100161/

{noformat}
Error Message
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 2 times over 4.021686279 seconds. 
Last failure message: 4 did not equal 3.
Stacktrace
sbt.ForkMain$ForkError: 
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 2 times over 4.021686279 seconds. 
Last failure message: 4 did not equal 3.
at 
org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:432)
at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:439)
at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:391)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479)
at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:308)
at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:307)
at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479)
at 
org.apache.spark.storage.BlockManagerReplicationBehavior.$anonfun$$init$$26(BlockManagerReplicationSuite.scala:299)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
{noformat}

> Flaky test: BlockManagerProactiveReplicationSuite
> -
>
> Key: SPARK-20130
> URL: https://issues.apache.org/jira/browse/SPARK-20130
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.2.0
>Reporter: Marcelo Vanzin
>Priority: Major
>
> See following page:
> https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.storage.BlockManagerProactiveReplicationSuite_name=proactive+block+replication+-+5+replicas+-+4+block+manager+deletions
> I also have seen it fail intermittently during local unit test runs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24942) Improve cluster resource management with jobs containing barrier stage

2018-12-14 Thread Ilya Matiach (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721841#comment-16721841
 ] 

Ilya Matiach commented on SPARK-24942:
--

Would really like to see this resolved.  It would be great if we could have 
barrier execution with dynamic allocation enabled.  In the case that dynamic 
allocation is enabled, we should be able to automatically restart the job if 
resources are for some reason removed and allow the developer to decide whether 
to restart the job when resources are added (in their own code) to utilize more 
resources.  For the latter case, I think many algorithms that would use 
something like barrier execution mode are iterative and so they should be able 
to save the current state and then restart when more resources are allocated.

> Improve cluster resource management with jobs containing barrier stage
> --
>
> Key: SPARK-24942
> URL: https://issues.apache.org/jira/browse/SPARK-24942
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Xingbo Jiang
>Priority: Major
>
> https://github.com/apache/spark/pull/21758#discussion_r205652317
> We shall improve cluster resource management to address the following issues:
> - With dynamic resource allocation enabled, it may happen that we acquire 
> some executors (but not enough to launch all the tasks in a barrier stage) 
> and later release them due to executor idle time expire, and then acquire 
> again.
> - There can be deadlock with two concurrent applications. Each application 
> may acquire some resources, but not enough to launch all the tasks in a 
> barrier stage. And after hitting the idle timeout and releasing them, they 
> may acquire resources again, but just continually trade resources between 
> each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26373) Spark UI 'environment' tab - column to indicate default vs overridden values

2018-12-14 Thread t oo (JIRA)
t oo created SPARK-26373:


 Summary: Spark UI 'environment' tab - column to indicate default 
vs overridden values
 Key: SPARK-26373
 URL: https://issues.apache.org/jira/browse/SPARK-26373
 Project: Spark
  Issue Type: New Feature
  Components: Web UI
Affects Versions: 2.4.0
Reporter: t oo


Rather than just showing name and value for each property, a new column would 
also show whether the value is default (show 'AS PER DEFAULT') or if its 
overridden (show the actual default value).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26267) Kafka source may reprocess data

2018-12-14 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu reassigned SPARK-26267:


Assignee: Shixiong Zhu

> Kafka source may reprocess data
> ---
>
> Key: SPARK-26267
> URL: https://issues.apache.org/jira/browse/SPARK-26267
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Blocker
>  Labels: correctness
>
> Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it 
> may get an earliest offset, and then it will reprocess messages that have 
> been processed when it gets the correct latest offset in the next batch.
> This usually happens when restarting a streaming query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26327) Metrics in FileSourceScanExec not update correctly while relation.partitionSchema is set

2018-12-14 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26327:
--
Fix Version/s: 2.4.1
   2.3.3
   2.2.3

> Metrics in FileSourceScanExec not update correctly while 
> relation.partitionSchema is set
> 
>
> Key: SPARK-26327
> URL: https://issues.apache.org/jira/browse/SPARK-26327
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Yuanjian Li
>Assignee: Yuanjian Li
>Priority: Major
> Fix For: 2.2.3, 2.3.3, 2.4.1, 3.0.0
>
>
> As currently approach in `FileSourceScanExec`, the metrics of "numFiles" and 
> "metadataTime"(fileListingTime) were updated while lazy val 
> `selectedPartitions` initialized in the scenario of relation.partitionSchema 
> is set. But `selectedPartitions` will be initialized by `metadata` at first, 
> which is called by `queryExecution.toString` in 
> `SQLExecution.withNewExecutionId`. So while the 
> `SQLMetrics.postDriverMetricUpdates` called, there's no corresponding 
> liveExecutions in SQLAppStatusListener, the metrics update is not work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26327) Metrics in FileSourceScanExec not update correctly while relation.partitionSchema is set

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721788#comment-16721788
 ] 

ASF GitHub Bot commented on SPARK-26327:


dongjoon-hyun closed pull request #23300: [SPARK-26327][SQL][BACKPORT-2.2] Bug 
fix for `FileSourceScanExec` metrics update
URL: https://github.com/apache/spark/pull/23300
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 6fb41b6425c4b..1f6a7c089bf07 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -170,19 +170,14 @@ case class FileSourceScanExec(
 false
   }
 
+  private var metadataTime = 0L
+
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
 val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
 val startTime = System.nanoTime()
 val ret = relation.location.listFiles(partitionFilters, dataFilters)
 val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
-
-metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-metrics("metadataTime").add(timeTakenMs)
-
-val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-  metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+metadataTime = timeTakenMs
 ret
   }
 
@@ -281,6 +276,8 @@ case class FileSourceScanExec(
   }
 
   private lazy val inputRDD: RDD[InternalRow] = {
+// Update metrics for taking effect in both code generation node and 
normal node.
+updateDriverMetrics()
 val readFile: (PartitionedFile) => Iterator[InternalRow] =
   relation.fileFormat.buildReaderWithPartitionValues(
 sparkSession = relation.sparkSession,
@@ -514,6 +511,19 @@ case class FileSourceScanExec(
 }
   }
 
+  /**
+   * Send the updated metrics to driver, while this function calling, 
selectedPartitions has
+   * been initialized. See SPARK-26327 for more detail.
+   */
+  private def updateDriverMetrics() = {
+metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+metrics("metadataTime").add(metadataTime)
+
+val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+  metrics("numFiles") :: metrics("metadataTime") :: Nil)
+  }
+
   override lazy val canonicalized: FileSourceScanExec = {
 FileSourceScanExec(
   relation,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 79d1fbfa3f072..26b822fe6208f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -372,6 +372,21 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
   assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: 
(0L, 300L, 0L) :: Nil)
 }
   }
+
+  test("SPARK-26327: FileSourceScanExec metrics") {
+withTable("testDataForScan") {
+  spark.range(10).selectExpr("id", "id % 3 as p")
+.write.partitionBy("p").saveAsTable("testDataForScan")
+  // The execution plan only has 1 FileScan node.
+  val df = spark.sql(
+"SELECT * FROM testDataForScan WHERE p = 1")
+  testSparkPlanMetrics(df, 1, Map(
+0L -> (("Scan parquet default.testdataforscan", Map(
+  "number of output rows" -> 3L,
+  "number of files" -> 2L
+  )
+}
+  }
 }
 
 object InputOutputMetricsHelper {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Metrics in FileSourceScanExec not update correctly while 
> relation.partitionSchema is set
> 
>
> Key: SPARK-26327
> URL: https://issues.apache.org/jira/browse/SPARK-26327
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.4.0

[jira] [Commented] (SPARK-26327) Metrics in FileSourceScanExec not update correctly while relation.partitionSchema is set

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721785#comment-16721785
 ] 

ASF GitHub Bot commented on SPARK-26327:


dongjoon-hyun closed pull request #23299: [SPARK-26327][SQL][BACKPORT-2.3] Bug 
fix for `FileSourceScanExec` metrics update
URL: https://github.com/apache/spark/pull/23299
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 0a23264a4a16b..5543087db0614 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -183,19 +183,14 @@ case class FileSourceScanExec(
   partitionSchema = relation.partitionSchema,
   relation.sparkSession.sessionState.conf)
 
+  private var metadataTime = 0L
+
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
 val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
 val startTime = System.nanoTime()
 val ret = relation.location.listFiles(partitionFilters, dataFilters)
 val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
-
-metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-metrics("metadataTime").add(timeTakenMs)
-
-val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-  metrics("numFiles") :: metrics("metadataTime") :: Nil)
-
+metadataTime = timeTakenMs
 ret
   }
 
@@ -293,6 +288,8 @@ case class FileSourceScanExec(
   }
 
   private lazy val inputRDD: RDD[InternalRow] = {
+// Update metrics for taking effect in both code generation node and 
normal node.
+updateDriverMetrics()
 val readFile: (PartitionedFile) => Iterator[InternalRow] =
   relation.fileFormat.buildReaderWithPartitionValues(
 sparkSession = relation.sparkSession,
@@ -500,6 +497,19 @@ case class FileSourceScanExec(
 }
   }
 
+  /**
+   * Send the updated metrics to driver, while this function calling, 
selectedPartitions has
+   * been initialized. See SPARK-26327 for more detail.
+   */
+  private def updateDriverMetrics() = {
+metrics("numFiles").add(selectedPartitions.map(_.files.size.toLong).sum)
+metrics("metadataTime").add(metadataTime)
+
+val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+  metrics("numFiles") :: metrics("metadataTime") :: Nil)
+  }
+
   override def doCanonicalize(): FileSourceScanExec = {
 FileSourceScanExec(
   relation,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index a3a3f3851e21c..439a36080ce90 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -504,4 +504,19 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
   test("writing data out metrics with dynamic partition: parquet") {
 testMetricsDynamicPartition("parquet", "parquet", "t1")
   }
+
+  test("SPARK-26327: FileSourceScanExec metrics") {
+withTable("testDataForScan") {
+  spark.range(10).selectExpr("id", "id % 3 as p")
+.write.partitionBy("p").saveAsTable("testDataForScan")
+  // The execution plan only has 1 FileScan node.
+  val df = spark.sql(
+"SELECT * FROM testDataForScan WHERE p = 1")
+  testSparkPlanMetrics(df, 1, Map(
+0L -> (("Scan parquet default.testdataforscan", Map(
+  "number of output rows" -> 3L,
+  "number of files" -> 2L
+  )
+}
+  }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Metrics in FileSourceScanExec not update correctly while 
> relation.partitionSchema is set
> 
>
> Key: SPARK-26327
> URL: https://issues.apache.org/jira/browse/SPARK-26327
> Project: Spark
>

[jira] [Resolved] (SPARK-21239) Support WAL recover in windows

2018-12-14 Thread Marcelo Vanzin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcelo Vanzin resolved SPARK-21239.

Resolution: Duplicate

As far as my understanding goes the fix for SPARK-25778 should also fix this.

> Support WAL recover in windows
> --
>
> Key: SPARK-21239
> URL: https://issues.apache.org/jira/browse/SPARK-21239
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Windows
>Affects Versions: 1.6.3, 2.1.0, 2.1.1, 2.2.0
>Reporter: Yun Tang
>Priority: Major
>
> When driver failed over, it will read WAL from HDFS by calling 
> WriteAheadLogBackedBlockRDD.getBlockFromWriteAheadLog(), however, it need a 
> dummy local path to satisfy the method parameter requirements, but the path 
> in windows will contain a colon which is not valid for hadoop. I removed the 
> potential driver letter and colon.
> I found one email from spark-user ever talked about this bug 
> (https://www.mail-archive.com/user@spark.apache.org/msg55030.html)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21239) Support WAL recover in windows

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721662#comment-16721662
 ] 

ASF GitHub Bot commented on SPARK-21239:


vanzin closed pull request #18452: [SPARK-21239][STREAMING] Support WAL recover 
in windows
URL: https://github.com/apache/spark/pull/18452
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 844760ab61d2e..2c97c0afa260e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -135,8 +135,11 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
 // FileBasedWriteAheadLog will not create any file or directory at 
that path. Also,
 // this dummy directory should not already exist otherwise the WAL 
will try to recover
 // past events from the directory and throw errors.
+// Specifically, the nonExistentDirectory will contain a colon in 
windows, this is invalid
+// for hadoop. Remove the drive letter and colon, e.g. "D:" out of 
this path by default
 val nonExistentDirectory = new File(
-  System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString).getAbsolutePath
+  System.getProperty("java.io.tmpdir"), 
+  
UUID.randomUUID().toString).getAbsolutePath.replaceFirst("[a-zA-Z]:", "")
 writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
   SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
 dataRead = writeAheadLog.read(partition.walRecordHandle)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support WAL recover in windows
> --
>
> Key: SPARK-21239
> URL: https://issues.apache.org/jira/browse/SPARK-21239
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams, Windows
>Affects Versions: 1.6.3, 2.1.0, 2.1.1, 2.2.0
>Reporter: Yun Tang
>Priority: Major
>
> When driver failed over, it will read WAL from HDFS by calling 
> WriteAheadLogBackedBlockRDD.getBlockFromWriteAheadLog(), however, it need a 
> dummy local path to satisfy the method parameter requirements, but the path 
> in windows will contain a colon which is not valid for hadoop. I removed the 
> potential driver letter and colon.
> I found one email from spark-user ever talked about this bug 
> (https://www.mail-archive.com/user@spark.apache.org/msg55030.html)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25694) URL.setURLStreamHandlerFactory causing incompatible HttpURLConnection issue

2018-12-14 Thread Howard Wong (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721656#comment-16721656
 ] 

Howard Wong commented on SPARK-25694:
-

[~boyangwa], do you know of a workaround for this? I'm facing the same issue.

> URL.setURLStreamHandlerFactory causing incompatible HttpURLConnection issue
> ---
>
> Key: SPARK-25694
> URL: https://issues.apache.org/jira/browse/SPARK-25694
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.0, 2.3.1, 2.3.2
>Reporter: Bo Yang
>Priority: Minor
>
> URL.setURLStreamHandlerFactory() in SharedState causes URL.openConnection() 
> returns FsUrlConnection object, which is not compatible with 
> HttpURLConnection. This will cause exception when using some third party http 
> library (e.g. scalaj.http).
> The following code in Spark 2.3.0 introduced the issue: 
> sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala:
> {code}
> object SharedState extends Logging  {   ...   
>   URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())   ...
> }
> {code}
> Here is the example exception when using scalaj.http in Spark:
> {code}
>  StackTrace: scala.MatchError: 
> org.apache.hadoop.fs.FsUrlConnection:[http://.example.com|http://.example.com/]
>  (of class org.apache.hadoop.fs.FsUrlConnection)
>  at 
> scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:343)
>  at scalaj.http.HttpRequest.exec(Http.scala:335)
>  at scalaj.http.HttpRequest.asString(Http.scala:455)
> {code}
>   
> One option to fix the issue is to return null in 
> URLStreamHandlerFactory.createURLStreamHandler when the protocol is 
> http/https, so it will use the default behavior and be compatible with 
> scalaj.http. Following is the code example:
> {code}
> class SparkUrlStreamHandlerFactory extends URLStreamHandlerFactory with 
> Logging {
>   private val fsUrlStreamHandlerFactory = new FsUrlStreamHandlerFactory()
>   override def createURLStreamHandler(protocol: String): URLStreamHandler = {
> val handler = fsUrlStreamHandlerFactory.createURLStreamHandler(protocol)
> if (handler == null) {
>   return null
> }
> if (protocol != null &&
>   (protocol.equalsIgnoreCase("http")
>   || protocol.equalsIgnoreCase("https"))) {
>   // return null to use system default URLStreamHandler
>   null
> } else {
>   handler
> }
>   }
> }
> {code}
> I would like to get some discussion here before submitting a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26372) CSV parsing uses previous good value for bad input field

2018-12-14 Thread Bruce Robbins (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Robbins updated SPARK-26372:
--
Summary: CSV parsing uses previous good value for bad input field  (was: 
CSV Parsing uses previous good value for bad input field)

> CSV parsing uses previous good value for bad input field
> 
>
> Key: SPARK-26372
> URL: https://issues.apache.org/jira/browse/SPARK-26372
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> For example:
> {noformat}
> bash-3.2$ cat test.csv 
> "hello",1999-08-01
> "there","bad date"
> "again","2017-11-22"
> bash-3.2$ bin/spark-shell
> ..etc..
> scala> import org.apache.spark.sql.types._
> scala> import org.apache.spark.sql.SaveMode
> scala> var schema = StructType(StructField("col1", StringType) ::
>  |   StructField("col2", DateType) ::
>  |   Nil)
> schema: org.apache.spark.sql.types.StructType = 
> StructType(StructField(col1,StringType,true), StructField(col2,DateType,true))
> scala> val df = spark.read.schema(schema).csv("test.csv")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: date]
> scala> df.show
> +-+--+
>   
> | col1|  col2|
> +-+--+
> |hello|1999-08-01|
> |there|1999-08-01|
> |again|2017-11-22|
> +-+--+
> scala> 
> {noformat}
> col2 from the second row contains "1999-08-01", when it should contain null.
> This is because UnivocityParser reuses the same Row object for each input 
> record. If there is an exception converting an input field, the code simply 
> skips over that field, leaving the existing value in the Row object.
> The simple fix is to set the column to null in the Row object whenever there 
> is a badRecordException while converting the input field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26372) CSV parsing uses previous good value for bad input field

2018-12-14 Thread Bruce Robbins (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721593#comment-16721593
 ] 

Bruce Robbins commented on SPARK-26372:
---

I can prep a PR, unless someone thinks this needs a different solution than the 
one I proposed.

> CSV parsing uses previous good value for bad input field
> 
>
> Key: SPARK-26372
> URL: https://issues.apache.org/jira/browse/SPARK-26372
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Bruce Robbins
>Priority: Major
>
> For example:
> {noformat}
> bash-3.2$ cat test.csv 
> "hello",1999-08-01
> "there","bad date"
> "again","2017-11-22"
> bash-3.2$ bin/spark-shell
> ..etc..
> scala> import org.apache.spark.sql.types._
> scala> import org.apache.spark.sql.SaveMode
> scala> var schema = StructType(StructField("col1", StringType) ::
>  |   StructField("col2", DateType) ::
>  |   Nil)
> schema: org.apache.spark.sql.types.StructType = 
> StructType(StructField(col1,StringType,true), StructField(col2,DateType,true))
> scala> val df = spark.read.schema(schema).csv("test.csv")
> df: org.apache.spark.sql.DataFrame = [col1: string, col2: date]
> scala> df.show
> +-+--+
>   
> | col1|  col2|
> +-+--+
> |hello|1999-08-01|
> |there|1999-08-01|
> |again|2017-11-22|
> +-+--+
> scala> 
> {noformat}
> col2 from the second row contains "1999-08-01", when it should contain null.
> This is because UnivocityParser reuses the same Row object for each input 
> record. If there is an exception converting an input field, the code simply 
> skips over that field, leaving the existing value in the Row object.
> The simple fix is to set the column to null in the Row object whenever there 
> is a badRecordException while converting the input field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26372) CSV Parsing uses previous good value for bad input field

2018-12-14 Thread Bruce Robbins (JIRA)
Bruce Robbins created SPARK-26372:
-

 Summary: CSV Parsing uses previous good value for bad input field
 Key: SPARK-26372
 URL: https://issues.apache.org/jira/browse/SPARK-26372
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Bruce Robbins


For example:
{noformat}
bash-3.2$ cat test.csv 
"hello",1999-08-01
"there","bad date"
"again","2017-11-22"
bash-3.2$ bin/spark-shell
..etc..
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.SaveMode
scala> var schema = StructType(StructField("col1", StringType) ::
 |   StructField("col2", DateType) ::
 |   Nil)
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(col1,StringType,true), StructField(col2,DateType,true))
scala> val df = spark.read.schema(schema).csv("test.csv")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: date]
scala> df.show
+-+--+  
| col1|  col2|
+-+--+
|hello|1999-08-01|
|there|1999-08-01|
|again|2017-11-22|
+-+--+
scala> 
{noformat}
col2 from the second row contains "1999-08-01", when it should contain null.

This is because UnivocityParser reuses the same Row object for each input 
record. If there is an exception converting an input field, the code simply 
skips over that field, leaving the existing value in the Row object.

The simple fix is to set the column to null in the Row object whenever there is 
a badRecordException while converting the input field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26370) Fix resolution of higher-order function for the same identifier.

2018-12-14 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-26370:
---

Assignee: Takuya Ueshin

> Fix resolution of higher-order function for the same identifier.
> 
>
> Key: SPARK-26370
> URL: https://issues.apache.org/jira/browse/SPARK-26370
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Takuya Ueshin
>Assignee: Takuya Ueshin
>Priority: Major
> Fix For: 2.4.1, 3.0.0
>
>
> When using a higher-order function with the same variable name as the 
> existing columns in {{Filter}} or something which uses 
> {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,:
> {code}
> val df = Seq(
>   (Seq(1, 9, 8, 7), 1, 2),
>   (Seq(5, 9, 7), 2, 2),
>   (Seq.empty, 3, 2),
>   (null, 4, 2)
> ).toDF("i", "x", "d")
> checkAnswer(df.filter("exists(i, x -> x % d == 0)"),
>   Seq(Row(Seq(1, 9, 8, 7), 1, 2)))
> checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"),
>   Seq(Row(1)))
> {code}
> the following exception happens:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.NamedExpression
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:237)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:237)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145)
>   at 
> org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216)
>   at 
> org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215)
> ...
> {code}
> because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly 
> resolved by the rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-26370) Fix resolution of higher-order function for the same identifier.

2018-12-14 Thread Wenchen Fan (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan resolved SPARK-26370.
-
   Resolution: Fixed
Fix Version/s: 2.4.1
   3.0.0

Issue resolved by pull request 23320
[https://github.com/apache/spark/pull/23320]

> Fix resolution of higher-order function for the same identifier.
> 
>
> Key: SPARK-26370
> URL: https://issues.apache.org/jira/browse/SPARK-26370
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Takuya Ueshin
>Assignee: Takuya Ueshin
>Priority: Major
> Fix For: 3.0.0, 2.4.1
>
>
> When using a higher-order function with the same variable name as the 
> existing columns in {{Filter}} or something which uses 
> {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,:
> {code}
> val df = Seq(
>   (Seq(1, 9, 8, 7), 1, 2),
>   (Seq(5, 9, 7), 2, 2),
>   (Seq.empty, 3, 2),
>   (null, 4, 2)
> ).toDF("i", "x", "d")
> checkAnswer(df.filter("exists(i, x -> x % d == 0)"),
>   Seq(Row(Seq(1, 9, 8, 7), 1, 2)))
> checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"),
>   Seq(Row(1)))
> {code}
> the following exception happens:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.NamedExpression
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:237)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:237)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145)
>   at 
> org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190)
>   at 
> org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185)
>   at 
> org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216)
>   at 
> org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215)
> ...
> {code}
> because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly 
> resolved by the rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26370) Fix resolution of higher-order function for the same identifier.

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721575#comment-16721575
 ] 

ASF GitHub Bot commented on SPARK-26370:


asfgit closed pull request #23320: [SPARK-26370][SQL] Fix resolution of 
higher-order function for the same identifier.
URL: https://github.com/apache/spark/pull/23320
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
index a8a7bbd9f9cd0..1cd7f412bb678 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
@@ -150,13 +150,14 @@ case class ResolveLambdaVariables(conf: SQLConf) extends 
Rule[LogicalPlan] {
   val lambdaMap = l.arguments.map(v => canonicalizer(v.name) -> v).toMap
   l.mapChildren(resolve(_, parentLambdaMap ++ lambdaMap))
 
-case u @ UnresolvedAttribute(name +: nestedFields) =>
+case u @ UnresolvedNamedLambdaVariable(name +: nestedFields) =>
   parentLambdaMap.get(canonicalizer(name)) match {
 case Some(lambda) =>
   nestedFields.foldLeft(lambda: Expression) { (expr, fieldName) =>
 ExtractValue(expr, Literal(fieldName), conf.resolver)
   }
-case None => u
+case None =>
+  UnresolvedAttribute(u.nameParts)
   }
 
 case _ =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
index a8639d29f964d..7141b6e996389 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
@@ -22,12 +22,34 @@ import java.util.concurrent.atomic.AtomicReference
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, 
UnresolvedAttribute, UnresolvedException}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.array.ByteArrayMethods
 
+/**
+ * A placeholder of lambda variables to prevent unexpected resolution of 
[[LambdaFunction]].
+ */
+case class UnresolvedNamedLambdaVariable(nameParts: Seq[String])
+  extends LeafExpression with NamedExpression with Unevaluable {
+
+  override def name: String =
+nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")
+
+  override def exprId: ExprId = throw new UnresolvedException(this, "exprId")
+  override def dataType: DataType = throw new UnresolvedException(this, 
"dataType")
+  override def nullable: Boolean = throw new UnresolvedException(this, 
"nullable")
+  override def qualifier: Seq[String] = throw new UnresolvedException(this, 
"qualifier")
+  override def toAttribute: Attribute = throw new UnresolvedException(this, 
"toAttribute")
+  override def newInstance(): NamedExpression = throw new 
UnresolvedException(this, "newInstance")
+  override lazy val resolved = false
+
+  override def toString: String = s"lambda '$name"
+
+  override def sql: String = name
+}
+
 /**
  * A named lambda variable.
  */
@@ -79,7 +101,7 @@ case class LambdaFunction(
 
 object LambdaFunction {
   val identity: LambdaFunction = {
-val id = UnresolvedAttribute.quoted("id")
+val id = UnresolvedNamedLambdaVariable(Seq("id"))
 LambdaFunction(id, Seq(id))
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 672bffcfc0cad..8959f78b656d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1338,9 +1338,12 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
*/
   override def visitLambda(ctx: LambdaContext): Expression = withOrigin(ctx) {
 val arguments = ctx.IDENTIFIER().asScala.map { name =>
-  UnresolvedAttribute.quoted(name.getText)
+  

[jira] [Commented] (SPARK-20890) Add min and max functions for dataset aggregation

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721387#comment-16721387
 ] 

ASF GitHub Bot commented on SPARK-20890:


srowen closed pull request #18113: [SPARK-20890][SQL] Added min and max typed 
aggregation functions
URL: https://github.com/apache/spark/pull/18113
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java 
b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
index ec9c107b1c119..f426dd95cec27 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/expressions/javalang/typed.java
@@ -21,10 +21,7 @@
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.TypedColumn;
-import org.apache.spark.sql.execution.aggregate.TypedAverage;
-import org.apache.spark.sql.execution.aggregate.TypedCount;
-import org.apache.spark.sql.execution.aggregate.TypedSumDouble;
-import org.apache.spark.sql.execution.aggregate.TypedSumLong;
+import org.apache.spark.sql.execution.aggregate.*;
 
 /**
  * :: Experimental ::
@@ -74,4 +71,40 @@
   public static  TypedColumn sumLong(MapFunction f) {
 return new TypedSumLong(f).toColumnJava();
   }
+
+  /**
+   * Min aggregate function for floating point (double) type.
+   *
+   * @since 2.3.0
+   */
+  public static  TypedColumn min(MapFunction f) {
+return new JavaTypedMinDouble(f).toColumn();
+  }
+
+  /**
+   * Min aggregate function for integral (long, i.e. 64 bit integer) type.
+   *
+   * @since 2.3.0
+   */
+  public static  TypedColumn minLong(MapFunction f) {
+return new JavaTypedMinLong(f).toColumn();
+  }
+
+  /**
+   * Min aggregate function for floating point (double) type.
+   *
+   * @since 2.3.0
+   */
+  public static  TypedColumn max(MapFunction f) {
+return new JavaTypedMaxDouble(f).toColumn();
+  }
+
+  /**
+   * Min aggregate function for integral (long, i.e. 64 bit integer) type.
+   *
+   * @since 2.3.0
+   */
+  public static  TypedColumn maxLong(MapFunction f) {
+return new JavaTypedMaxLong(f).toColumn();
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
index b6550bf3e4aac..1d019e4f78f6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.aggregate
 
 import org.apache.spark.api.java.function.MapFunction
-import org.apache.spark.sql.{Encoder, TypedColumn}
+import org.apache.spark.sql.{Encoder, Encoders, TypedColumn}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.expressions.Aggregator
 
@@ -38,13 +38,11 @@ class TypedSumDouble[IN](val f: IN => Double) extends 
Aggregator[IN, Double, Dou
 
   // Java api support
   def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => 
f.call(x).asInstanceOf[Double])
-
   def toColumnJava: TypedColumn[IN, java.lang.Double] = {
 toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]]
   }
 }
 
-
 class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
   override def zero: Long = 0L
   override def reduce(b: Long, a: IN): Long = b + f(a)
@@ -56,13 +54,11 @@ class TypedSumLong[IN](val f: IN => Long) extends 
Aggregator[IN, Long, Long] {
 
   // Java api support
   def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => 
f.call(x).asInstanceOf[Long])
-
   def toColumnJava: TypedColumn[IN, java.lang.Long] = {
 toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]]
   }
 }
 
-
 class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
   override def zero: Long = 0
   override def reduce(b: Long, a: IN): Long = {
@@ -81,14 +77,13 @@ class TypedCount[IN](val f: IN => Any) extends 
Aggregator[IN, Long, Long] {
   }
 }
 
-
 class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, 
Long), Double] {
   override def zero: (Double, Long) = (0.0, 0L)
   override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + 
b._1, 1 + b._2)
-  override def finish(reduction: (Double, Long)): Double = reduction._1 / 
reduction._2
   override def merge(b1: (Double, Long), b2: (Double, Long)): (Double, Long) = 
{
 (b1._1 + b2._1, b1._2 + b2._2)
   }
+  override def finish(reduction: (Double, Long)): Double = 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-12-14 Thread Julian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721386#comment-16721386
 ] 

Julian commented on SPARK-21453:


Also the issue is marked as Major on the other thread, but here only Minor?

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-12-14 Thread Julian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721380#comment-16721380
 ] 

Julian commented on SPARK-21453:


I see this issue is also raised against Kafka under 
https://issues.apache.org/jira/browse/KAFKA-5649

 

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   at 

[jira] [Commented] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721346#comment-16721346
 ] 

ASF GitHub Bot commented on SPARK-25922:


suxingfate opened a new pull request #23322: [SPARK-25922][K8] Spark 
Driver/Executor spark-app-selector label mism…
URL: https://github.com/apache/spark/pull/23322
 
 
   ## What changes were proposed in this pull request?
   
   In K8S Cluster mode, the algorithm to generate 
spark-app-selector/spark.app.id of spark driver is different with spark 
executor.
   This patch consolidated the algorithm for driver and executor to have a 
universal logic. This will help to monitor resource
   consumption from K8S perspective.
   In K8S Client mode, this makes sure it will also use the same algorithm to 
generate spark-app-selector/spark.app.id for executors.
   
   ## How was this patch tested?
   
   Manually run."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [K8] Spark Driver/Executor "spark-app-selector" label mismatch
> --
>
> Key: SPARK-25922
> URL: https://issues.apache.org/jira/browse/SPARK-25922
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 RC4
>Reporter: Anmol Khurana
>Priority: Major
>
> Hi,
> I have been testing Spark 2.4.0 RC4 on Kubernetes  to run Python Spark 
> Applications and running into an issue where the AppId label on the driver 
> and executors mis-match. I am using the 
> [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these 
> applications. 
> I see a spark.app.id of the form spark-* as  "spark-app-selector" label on 
> the driver as well as in the K8 config-map which gets created for the driver 
> via spark-submit . My guess is this is coming from 
> [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211]
>  
> But when the driver actually comes up and brings up executors etc. , I see 
> that the "spark-app-selector" label on the executors as well as the 
> spark.app.Id config within the user-code on the driver is something of the 
> form spark-application-* ( probably from 
> [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511]
>  & 
> [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)]
>  )
> We were consuming this "spark-app-selector" label on the Driver Pod to get 
> the App Id and use it to look-up the app in SparkHistory server (among other 
> use-cases). but due to this mis-match, this logic no longer works. This was 
> working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is 
> this expected behavior and if yes, what's the correct way to fetch the 
> applicationId from outside the application ?  
> Let me know if I can provide any more details or if I am doing something 
> wrong. Here is an example run with different *spark-app-selector* label on 
> the driver/executor : 
>  
> {code:java}
> Name: pyfiles-driver
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:19:46 -0700
> Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f
>  spark-role=driver
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Running
> Name: pyfiles-1541121585642-exec-1
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:24:02 -0700
> Labels: spark-app-selector=spark-application-1541121829445
>  spark-exec-id=1
>  spark-role=executor
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Pending
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch

2018-12-14 Thread Wang, Xinglong (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721349#comment-16721349
 ] 

Wang, Xinglong commented on SPARK-25922:


Hi [~liyinan926],

Recently while evaluating spark on k8s, We are setting up tools to monitor 
resource usage from k8s perspective and also met this problem. 

I created a pull request for this. Could you please help to review?

Thanks,

> [K8] Spark Driver/Executor "spark-app-selector" label mismatch
> --
>
> Key: SPARK-25922
> URL: https://issues.apache.org/jira/browse/SPARK-25922
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 RC4
>Reporter: Anmol Khurana
>Priority: Major
>
> Hi,
> I have been testing Spark 2.4.0 RC4 on Kubernetes  to run Python Spark 
> Applications and running into an issue where the AppId label on the driver 
> and executors mis-match. I am using the 
> [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these 
> applications. 
> I see a spark.app.id of the form spark-* as  "spark-app-selector" label on 
> the driver as well as in the K8 config-map which gets created for the driver 
> via spark-submit . My guess is this is coming from 
> [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211]
>  
> But when the driver actually comes up and brings up executors etc. , I see 
> that the "spark-app-selector" label on the executors as well as the 
> spark.app.Id config within the user-code on the driver is something of the 
> form spark-application-* ( probably from 
> [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511]
>  & 
> [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)]
>  )
> We were consuming this "spark-app-selector" label on the Driver Pod to get 
> the App Id and use it to look-up the app in SparkHistory server (among other 
> use-cases). but due to this mis-match, this logic no longer works. This was 
> working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is 
> this expected behavior and if yes, what's the correct way to fetch the 
> applicationId from outside the application ?  
> Let me know if I can provide any more details or if I am doing something 
> wrong. Here is an example run with different *spark-app-selector* label on 
> the driver/executor : 
>  
> {code:java}
> Name: pyfiles-driver
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:19:46 -0700
> Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f
>  spark-role=driver
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Running
> Name: pyfiles-1541121585642-exec-1
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:24:02 -0700
> Labels: spark-app-selector=spark-application-1541121829445
>  spark-exec-id=1
>  spark-role=executor
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Pending
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25922:


Assignee: (was: Apache Spark)

> [K8] Spark Driver/Executor "spark-app-selector" label mismatch
> --
>
> Key: SPARK-25922
> URL: https://issues.apache.org/jira/browse/SPARK-25922
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 RC4
>Reporter: Anmol Khurana
>Priority: Major
>
> Hi,
> I have been testing Spark 2.4.0 RC4 on Kubernetes  to run Python Spark 
> Applications and running into an issue where the AppId label on the driver 
> and executors mis-match. I am using the 
> [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these 
> applications. 
> I see a spark.app.id of the form spark-* as  "spark-app-selector" label on 
> the driver as well as in the K8 config-map which gets created for the driver 
> via spark-submit . My guess is this is coming from 
> [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211]
>  
> But when the driver actually comes up and brings up executors etc. , I see 
> that the "spark-app-selector" label on the executors as well as the 
> spark.app.Id config within the user-code on the driver is something of the 
> form spark-application-* ( probably from 
> [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511]
>  & 
> [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)]
>  )
> We were consuming this "spark-app-selector" label on the Driver Pod to get 
> the App Id and use it to look-up the app in SparkHistory server (among other 
> use-cases). but due to this mis-match, this logic no longer works. This was 
> working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is 
> this expected behavior and if yes, what's the correct way to fetch the 
> applicationId from outside the application ?  
> Let me know if I can provide any more details or if I am doing something 
> wrong. Here is an example run with different *spark-app-selector* label on 
> the driver/executor : 
>  
> {code:java}
> Name: pyfiles-driver
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:19:46 -0700
> Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f
>  spark-role=driver
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Running
> Name: pyfiles-1541121585642-exec-1
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:24:02 -0700
> Labels: spark-app-selector=spark-application-1541121829445
>  spark-exec-id=1
>  spark-role=executor
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Pending
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25922) [K8] Spark Driver/Executor "spark-app-selector" label mismatch

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25922:


Assignee: Apache Spark

> [K8] Spark Driver/Executor "spark-app-selector" label mismatch
> --
>
> Key: SPARK-25922
> URL: https://issues.apache.org/jira/browse/SPARK-25922
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 RC4
>Reporter: Anmol Khurana
>Assignee: Apache Spark
>Priority: Major
>
> Hi,
> I have been testing Spark 2.4.0 RC4 on Kubernetes  to run Python Spark 
> Applications and running into an issue where the AppId label on the driver 
> and executors mis-match. I am using the 
> [https://github.com/GoogleCloudPlatform/spark-on-k8s-operator] to run these 
> applications. 
> I see a spark.app.id of the form spark-* as  "spark-app-selector" label on 
> the driver as well as in the K8 config-map which gets created for the driver 
> via spark-submit . My guess is this is coming from 
> [https://github.com/apache/spark/blob/f6cc354d83c2c9a757f9b507aadd4dbdc5825cca/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L211]
>  
> But when the driver actually comes up and brings up executors etc. , I see 
> that the "spark-app-selector" label on the executors as well as the 
> spark.app.Id config within the user-code on the driver is something of the 
> form spark-application-* ( probably from 
> [https://github.com/apache/spark/blob/b19a28dea098c7d6188f8540429c50f42952d678/core/src/main/scala/org/apache/spark/SparkContext.scala#L511]
>  & 
> [https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26|https://github.com/apache/spark/blob/bfb74394a5513134ea1da9fcf4a1783b77dd64e4/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L26)]
>  )
> We were consuming this "spark-app-selector" label on the Driver Pod to get 
> the App Id and use it to look-up the app in SparkHistory server (among other 
> use-cases). but due to this mis-match, this logic no longer works. This was 
> working fine in Spark 2.2 fork for Kubernetes which i was using earlier. Is 
> this expected behavior and if yes, what's the correct way to fetch the 
> applicationId from outside the application ?  
> Let me know if I can provide any more details or if I am doing something 
> wrong. Here is an example run with different *spark-app-selector* label on 
> the driver/executor : 
>  
> {code:java}
> Name: pyfiles-driver
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:19:46 -0700
> Labels: spark-app-selector=spark-b78bb10feebf4e2d98c11d7b6320e18f
>  spark-role=driver
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Running
> Name: pyfiles-1541121585642-exec-1
> Namespace: default
> Priority: 0
> PriorityClassName: 
> Start Time: Thu, 01 Nov 2018 18:24:02 -0700
> Labels: spark-app-selector=spark-application-1541121829445
>  spark-exec-id=1
>  spark-role=executor
>  sparkoperator.k8s.io/app-name=pyfiles
>  sparkoperator.k8s.io/launched-by-spark-operator=true
>  version=2.4.0
> Status: Pending
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19739) SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of AWS env vars

2018-12-14 Thread Steve Loughran (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721258#comment-16721258
 ] 

Steve Loughran commented on SPARK-19739:


BTW: I'm not backporting HADOOP-14556 to any non-trunk version of Hadoop as its 
fairly dramatic. 

If you want to add a patch to all of Hadoop-2.8 to 3.2 to add the temp creds at 
the start of the list, I'll review and inevitably commit. I don't think there 
was any deliberate intention not to add it as an option, (unlike the special 
"no credentials" switch, or the assumed role one). 
 Do read the [S3 test process 
doc|https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/testing.html#Policy_for_submitting_patches_which_affect_the_hadoop-aws_module.]
 before submitting though: you're required to run the whole s3a test suite and 
tell us what endpoint you ran against. Talk to some colleagues to get set up 
here, if you haven't done it before.

> SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of 
> AWS env vars
> --
>
> Key: SPARK-19739
> URL: https://issues.apache.org/jira/browse/SPARK-19739
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: Steve Loughran
>Assignee: Genmao Yu
>Priority: Minor
> Fix For: 2.2.0
>
>
> {{SparkHadoopUtil.appendS3AndSparkHadoopConfigurations()}} propagates the AWS 
> user and secret key to s3n and s3a config options, so getting secrets from 
> the user to the cluster, if set.
> AWS also supports session authentication (env var {{AWS_SESSION_TOKEN}}) and 
> region endpoints {{AWS_DEFAULT_REGION}}, the latter being critical if you 
> want to address V4-auth-only endpoints like frankfurt and Seol. 
> These env vars should be picked up and passed down to S3a too. 4+ lines of 
> code, though impossible to test unless the existing code is refactored to 
> take the env var map[String, String], so allowing a test suite to set the 
> values in itds own map.
> side issue: what if only half the env vars are set and users are trying to 
> understand why auth is failing? It may be good to build up a string 
> identifying which env vars had their value propagate, and log that @ debug, 
> while not logging the values, obviously.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart

2018-12-14 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721251#comment-16721251
 ] 

Gabor Somogyi commented on SPARK-26359:
---

There is also a possibility (apart from delete the metadata) to rename the file 
from
{quote}s3://some.domain/spark/checkpoints/49/feedback/state/2/11/.36870.delta.02e75d1f-aa48-4c61-9257-a5928b32e22e.TID2469780.tmp
{quote}
to
{quote}s3://some.domain/spark/checkpoints/49/feedback/state/2/11/36870.delta
{quote}
As a general warning: manually editing checkpoint directory could end up in 
data loss and not advised! Use it only if loss can be tolerated.

 

> Spark checkpoint restore fails after query restart
> --
>
> Key: SPARK-26359
> URL: https://issues.apache.org/jira/browse/SPARK-26359
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit, Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different 
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the 
> default is 1
>Reporter: Kaspar Tint
>Priority: Major
> Attachments: driver-redacted, metadata, redacted-offsets, 
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be 
> restarted after an usual S3 checkpointing failure. Now to clarify before 
> everything else - we are aware of the issues with S3 and are working towards 
> moving to HDFS but this will take time. S3 will cause queries to fail quite 
> often during peak hours and we have separate logic to handle this that will 
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems 
> like between detecting a failure in the query and starting it up again 
> something went really wrong with Spark and state in checkpoint folder got 
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = 
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId = 
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory: 
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at 
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
> at 
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
> at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)

[jira] [Commented] (SPARK-26371) Increase Kafka ConfigUpdater test coverage

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721240#comment-16721240
 ] 

ASF GitHub Bot commented on SPARK-26371:


gaborgsomogyi opened a new pull request #23321: [SPARK-26371][SS][TESTS] 
Increase kafka ConfigUpdater test coverage.
URL: https://github.com/apache/spark/pull/23321
 
 
   ## What changes were proposed in this pull request?
   
   As Kafka delegation token added logic into ConfigUpdater it would be good to 
test it.
   This PR contains the following changes:
   * ConfigUpdater extracted to a separate file and renamed to 
KafkaConfigUpdater
   * mockito-core dependency added to kafka-0-10-sql
   * Unit tests added
   
   ## How was this patch tested?
   
   Existing + new unit tests + on cluster.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Increase Kafka ConfigUpdater test coverage
> --
>
> Key: SPARK-26371
> URL: https://issues.apache.org/jira/browse/SPARK-26371
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming, Tests
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Minor
>
> As Kafka delegation token added logic into ConfigUpdater it would be good to 
> test it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26371) Increase Kafka ConfigUpdater test coverage

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26371:


Assignee: Apache Spark

> Increase Kafka ConfigUpdater test coverage
> --
>
> Key: SPARK-26371
> URL: https://issues.apache.org/jira/browse/SPARK-26371
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming, Tests
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Assignee: Apache Spark
>Priority: Minor
>
> As Kafka delegation token added logic into ConfigUpdater it would be good to 
> test it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-26371) Increase Kafka ConfigUpdater test coverage

2018-12-14 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26371:


Assignee: (was: Apache Spark)

> Increase Kafka ConfigUpdater test coverage
> --
>
> Key: SPARK-26371
> URL: https://issues.apache.org/jira/browse/SPARK-26371
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming, Tests
>Affects Versions: 3.0.0
>Reporter: Gabor Somogyi
>Priority: Minor
>
> As Kafka delegation token added logic into ConfigUpdater it would be good to 
> test it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart

2018-12-14 Thread Kaspar Tint (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721213#comment-16721213
 ] 

Kaspar Tint commented on SPARK-26359:
-

Means we changed the configuration *spark.sql.streaming.checkpointLocation: 
"s3a://some.domain/spark/checkpoints/49"* to 
*spark.sql.streaming.checkpointLocation: 
"s3a://some.domain/spark/checkpoints/50"* and restarted the whole Spark 
application. This obviously results in dataloss and is not a preferred solution.

> Spark checkpoint restore fails after query restart
> --
>
> Key: SPARK-26359
> URL: https://issues.apache.org/jira/browse/SPARK-26359
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit, Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different 
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the 
> default is 1
>Reporter: Kaspar Tint
>Priority: Major
> Attachments: driver-redacted, metadata, redacted-offsets, 
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be 
> restarted after an usual S3 checkpointing failure. Now to clarify before 
> everything else - we are aware of the issues with S3 and are working towards 
> moving to HDFS but this will take time. S3 will cause queries to fail quite 
> often during peak hours and we have separate logic to handle this that will 
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems 
> like between detecting a failure in the query and starting it up again 
> something went really wrong with Spark and state in checkpoint folder got 
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = 
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId = 
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory: 
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at 
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
> at 
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
> at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> 

[jira] [Created] (SPARK-26371) Increase Kafka ConfigUpdater test coverage

2018-12-14 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created SPARK-26371:
-

 Summary: Increase Kafka ConfigUpdater test coverage
 Key: SPARK-26371
 URL: https://issues.apache.org/jira/browse/SPARK-26371
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming, Tests
Affects Versions: 3.0.0
Reporter: Gabor Somogyi


As Kafka delegation token added logic into ConfigUpdater it would be good to 
test it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart

2018-12-14 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721195#comment-16721195
 ] 

Gabor Somogyi commented on SPARK-26359:
---

Yeah, after the recovery it's advised to switch that back.
Related recovery metadata can be deleted. BTW, when you've said
{quote}an engineer stepped in and bumped the checkpoint version manually{quote}
which file was touched exactly and how?
 

 

> Spark checkpoint restore fails after query restart
> --
>
> Key: SPARK-26359
> URL: https://issues.apache.org/jira/browse/SPARK-26359
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit, Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different 
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the 
> default is 1
>Reporter: Kaspar Tint
>Priority: Major
> Attachments: driver-redacted, metadata, redacted-offsets, 
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be 
> restarted after an usual S3 checkpointing failure. Now to clarify before 
> everything else - we are aware of the issues with S3 and are working towards 
> moving to HDFS but this will take time. S3 will cause queries to fail quite 
> often during peak hours and we have separate logic to handle this that will 
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems 
> like between detecting a failure in the query and starting it up again 
> something went really wrong with Spark and state in checkpoint folder got 
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = 
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId = 
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory: 
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at 
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
> at 
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
> at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> 

[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart

2018-12-14 Thread Kaspar Tint (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721107#comment-16721107
 ] 

Kaspar Tint commented on SPARK-26359:
-

This seems to be related to the fact that we decided to try the 
*spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version* 2 instead of the 
default 1. Gives a much better performance but looks like this issue here is 
the tradeoff then? 

Do you have any suggestions maybe on how to recover from this in a sane way?

> Spark checkpoint restore fails after query restart
> --
>
> Key: SPARK-26359
> URL: https://issues.apache.org/jira/browse/SPARK-26359
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit, Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark 2.4.0 deployed in standalone-client mode
> Checkpointing is done to S3
> The Spark application in question is responsible for running 4 different 
> queries
> Queries are written using Structured Streaming
> We are using the following algorithm for hopes of better performance:
> spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the 
> default is 1
>Reporter: Kaspar Tint
>Priority: Major
> Attachments: driver-redacted, metadata, redacted-offsets, 
> state-redacted, worker-redacted
>
>
> We had an incident where one of our structured streaming queries could not be 
> restarted after an usual S3 checkpointing failure. Now to clarify before 
> everything else - we are aware of the issues with S3 and are working towards 
> moving to HDFS but this will take time. S3 will cause queries to fail quite 
> often during peak hours and we have separate logic to handle this that will 
> attempt to restart the failed queries if possible.
> In this particular case we could not restart one of the failed queries. Seems 
> like between detecting a failure in the query and starting it up again 
> something went really wrong with Spark and state in checkpoint folder got 
> corrupted for some reason.
> The issue starts with the usual *FileNotFoundException* that happens with S3
> {code:java}
> 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = 
> c074233a-2563-40fc-8036-b5e38e2e2c42, runId = 
> e607eb6e-8431-4269-acab-cc2c1f9f09dd]
> terminated with error
> java.io.FileNotFoundException: No such file or directory: 
> s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37
> 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
> at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
> at 
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715)
> at 
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
> at 
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965)
> at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331)
> at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL
> og.scala:126)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
> at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
> at 
> 

[jira] [Commented] (SPARK-26353) Add typed aggregate functions:max&

2018-12-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721033#comment-16721033
 ] 

ASF GitHub Bot commented on SPARK-26353:


10110346 opened a new pull request #23304: [SPARK-26353][SQL]Add typed 
aggregate functions: max&
URL: https://github.com/apache/spark/pull/23304
 
 
   ## What changes were proposed in this pull request?
   
   For Dataset API,  aggregate functions:max& are not implemented in a 
type-safe way at the moment.
   This pull request adds min && max aggregate functions in 
`expressions.scalalang.typed`  and `expressions.javalang.typed`package.
   
   ## How was this patch tested?
   Added a unit test


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add typed aggregate functions:max&
> --
>
> Key: SPARK-26353
> URL: https://issues.apache.org/jira/browse/SPARK-26353
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: liuxian
>Priority: Minor
>
> For Dataset API,  aggregate functions:max& are not implemented in a 
> type-safe way at the moment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org