[jira] [Assigned] (SPARK-24397) Add TaskContext.getLocalProperties in Python
[ https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24397: Assignee: Tathagata Das (was: Apache Spark) > Add TaskContext.getLocalProperties in Python > > > Key: SPARK-24397 > URL: https://issues.apache.org/jira/browse/SPARK-24397 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24397) Add TaskContext.getLocalProperties in Python
[ https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491429#comment-16491429 ] Apache Spark commented on SPARK-24397: -- User 'tdas' has created a pull request for this issue: https://github.com/apache/spark/pull/21437 > Add TaskContext.getLocalProperties in Python > > > Key: SPARK-24397 > URL: https://issues.apache.org/jira/browse/SPARK-24397 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24397) Add TaskContext.getLocalProperties in Python
[ https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24397: Assignee: Apache Spark (was: Tathagata Das) > Add TaskContext.getLocalProperties in Python > > > Key: SPARK-24397 > URL: https://issues.apache.org/jira/browse/SPARK-24397 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Apache Spark >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24250) support accessing SQLConf inside tasks
[ https://issues.apache.org/jira/browse/SPARK-24250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491419#comment-16491419 ] Apache Spark commented on SPARK-24250: -- User 'gatorsmile' has created a pull request for this issue: https://github.com/apache/spark/pull/21436 > support accessing SQLConf inside tasks > -- > > Key: SPARK-24250 > URL: https://issues.apache.org/jira/browse/SPARK-24250 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 2.4.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24396) Add Structured Streaming ForeachWriter for python
[ https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491415#comment-16491415 ] Tathagata Das commented on SPARK-24396: --- TaskContext.getLocalProperty in Python is needed for getting the batchId/epochId that is passed on by StreamExecution as Spark job local property. > Add Structured Streaming ForeachWriter for python > - > > Key: SPARK-24396 > URL: https://issues.apache.org/jira/browse/SPARK-24396 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > > Users should be able to write ForeachWriter code in python, that is, they > should be able to use the partitionid and the version/batchId/epochId to > conditionally process rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24396) Add Structured Streaming ForeachWriter for python
[ https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491415#comment-16491415 ] Tathagata Das edited comment on SPARK-24396 at 5/26/18 12:07 AM: - TaskContext.getLocalProperty (SPARK-24397) in Python is needed for getting the batchId/epochId that is passed on by StreamExecution as Spark job local property. was (Author: tdas): TaskContext.getLocalProperty in Python is needed for getting the batchId/epochId that is passed on by StreamExecution as Spark job local property. > Add Structured Streaming ForeachWriter for python > - > > Key: SPARK-24396 > URL: https://issues.apache.org/jira/browse/SPARK-24396 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > > Users should be able to write ForeachWriter code in python, that is, they > should be able to use the partitionid and the version/batchId/epochId to > conditionally process rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24397) Add TaskContext.getLocalProperties in Python
[ https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das updated SPARK-24397: -- Issue Type: New Feature (was: Sub-task) Parent: (was: SPARK-24396) > Add TaskContext.getLocalProperties in Python > > > Key: SPARK-24397 > URL: https://issues.apache.org/jira/browse/SPARK-24397 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24397) Add TaskContext.getLocalProperties in Python
Tathagata Das created SPARK-24397: - Summary: Add TaskContext.getLocalProperties in Python Key: SPARK-24397 URL: https://issues.apache.org/jira/browse/SPARK-24397 Project: Spark Issue Type: Sub-task Components: PySpark Affects Versions: 2.3.0 Reporter: Tathagata Das Assignee: Tathagata Das -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24396) Add Structured Streaming ForeachWriter for python
Tathagata Das created SPARK-24396: - Summary: Add Structured Streaming ForeachWriter for python Key: SPARK-24396 URL: https://issues.apache.org/jira/browse/SPARK-24396 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 2.3.0 Reporter: Tathagata Das Assignee: Tathagata Das Users should be able to write ForeachWriter code in python, that is, they should be able to use the partitionid and the version/batchId/epochId to conditionally process rows. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24359) SPIP: ML Pipelines in R
[ https://issues.apache.org/jira/browse/SPARK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491412#comment-16491412 ] Joseph K. Bradley commented on SPARK-24359: --- Regarding separating repos: What's the conclusion? If feasible, I really hope this can be in the apache/spark repo to encourage contributors to add R wrappers whenever they add new MLlib APIs (just like it's pretty easy to add Python wrappers nowadays). Regarding CRAN releases: I'd expect it to be well worth it to say SparkML minor releases correspond to Spark minor releases. Users should not expect SparkML 2.4 to work with Spark 2.3 (since R would encounter missing Java APIs). I'm less sure about patch releases. (Ideally, this would all be solved by us following semantic versioning, but that would require that we never add Experimental APIs to SparkML.) If we can solve the maintainability issues with CRAN compatibility via integration tests, then I figure it'd be ideal to treat SparkML just like SparkR and PySpark, releasing in sync with the rest of Spark. Thoughts? > SPIP: ML Pipelines in R > --- > > Key: SPARK-24359 > URL: https://issues.apache.org/jira/browse/SPARK-24359 > Project: Spark > Issue Type: Improvement > Components: SparkR >Affects Versions: 3.0.0 >Reporter: Hossein Falaki >Priority: Major > Labels: SPIP > Attachments: SparkML_ ML Pipelines in R-v2.pdf, SparkML_ ML Pipelines > in R.pdf > > > h1. Background and motivation > SparkR supports calling MLlib functionality with an [R-friendly > API|https://docs.google.com/document/d/10NZNSEurN2EdWM31uFYsgayIPfCFHiuIu3pCWrUmP_c/]. > Since Spark 1.5 the (new) SparkML API which is based on [pipelines and > parameters|https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o] > has matured significantly. It allows users build and maintain complicated > machine learning pipelines. A lot of this functionality is difficult to > expose using the simple formula-based API in SparkR. > We propose a new R package, _SparkML_, to be distributed along with SparkR as > part of Apache Spark. This new package will be built on top of SparkR’s APIs > to expose SparkML’s pipeline APIs and functionality. > *Why not SparkR?* > SparkR package contains ~300 functions. Many of these shadow functions in > base and other popular CRAN packages. We think adding more functions to > SparkR will degrade usability and make maintenance harder. > *Why not sparklyr?* > sparklyr is an R package developed by RStudio Inc. to expose Spark API to R > users. sparklyr includes MLlib API wrappers, but to the best of our knowledge > they are not comprehensive. Also we propose a code-gen approach for this > package to minimize work needed to expose future MLlib API, but sparklyr’s > API is manually written. > h1. Target Personas > * Existing SparkR users who need more flexible SparkML API > * R users (data scientists, statisticians) who wish to build Spark ML > pipelines in R > h1. Goals > * R users can install SparkML from CRAN > * R users will be able to import SparkML independent from SparkR > * After setting up a Spark session R users can > ** create a pipeline by chaining individual components and specifying their > parameters > ** tune a pipeline in parallel, taking advantage of Spark > ** inspect a pipeline’s parameters and evaluation metrics > ** repeatedly apply a pipeline > * MLlib contributors can easily add R wrappers for new MLlib Estimators and > Transformers > h1. Non-Goals > * Adding new algorithms to SparkML R package which do not exist in Scala > * Parallelizing existing CRAN packages > * Changing existing SparkR ML wrapping API > h1. Proposed API Changes > h2. Design goals > When encountering trade-offs in API, we will chose based on the following > list of priorities. The API choice that addresses a higher priority goal will > be chosen. > # *Comprehensive coverage of MLlib API:* Design choices that make R coverage > of future ML algorithms difficult will be ruled out. > * *Semantic clarity*: We attempt to minimize confusion with other packages. > Between consciousness and clarity, we will choose clarity. > * *Maintainability and testability:* API choices that require manual > maintenance or make testing difficult should be avoided. > * *Interoperability with rest of Spark components:* We will keep the R API > as thin as possible and keep all functionality implementation in JVM/Scala. > * *Being natural to R users:* Ultimate users of this package are R users and > they should find it easy and natural to use. > The API will follow familiar R function syntax, where the object is passed as > the first argument of the method: do_something(obj, arg1, arg2). All > functions are snake_case (e.g., {{spark_log
[jira] [Updated] (SPARK-24359) SPIP: ML Pipelines in R
[ https://issues.apache.org/jira/browse/SPARK-24359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-24359: -- Description: h1. Background and motivation SparkR supports calling MLlib functionality with an [R-friendly API|https://docs.google.com/document/d/10NZNSEurN2EdWM31uFYsgayIPfCFHiuIu3pCWrUmP_c/]. Since Spark 1.5 the (new) SparkML API which is based on [pipelines and parameters|https://docs.google.com/document/d/1rVwXRjWKfIb-7PI6b86ipytwbUH7irSNLF1_6dLmh8o] has matured significantly. It allows users build and maintain complicated machine learning pipelines. A lot of this functionality is difficult to expose using the simple formula-based API in SparkR. We propose a new R package, _SparkML_, to be distributed along with SparkR as part of Apache Spark. This new package will be built on top of SparkR’s APIs to expose SparkML’s pipeline APIs and functionality. *Why not SparkR?* SparkR package contains ~300 functions. Many of these shadow functions in base and other popular CRAN packages. We think adding more functions to SparkR will degrade usability and make maintenance harder. *Why not sparklyr?* sparklyr is an R package developed by RStudio Inc. to expose Spark API to R users. sparklyr includes MLlib API wrappers, but to the best of our knowledge they are not comprehensive. Also we propose a code-gen approach for this package to minimize work needed to expose future MLlib API, but sparklyr’s API is manually written. h1. Target Personas * Existing SparkR users who need more flexible SparkML API * R users (data scientists, statisticians) who wish to build Spark ML pipelines in R h1. Goals * R users can install SparkML from CRAN * R users will be able to import SparkML independent from SparkR * After setting up a Spark session R users can ** create a pipeline by chaining individual components and specifying their parameters ** tune a pipeline in parallel, taking advantage of Spark ** inspect a pipeline’s parameters and evaluation metrics ** repeatedly apply a pipeline * MLlib contributors can easily add R wrappers for new MLlib Estimators and Transformers h1. Non-Goals * Adding new algorithms to SparkML R package which do not exist in Scala * Parallelizing existing CRAN packages * Changing existing SparkR ML wrapping API h1. Proposed API Changes h2. Design goals When encountering trade-offs in API, we will chose based on the following list of priorities. The API choice that addresses a higher priority goal will be chosen. # *Comprehensive coverage of MLlib API:* Design choices that make R coverage of future ML algorithms difficult will be ruled out. * *Semantic clarity*: We attempt to minimize confusion with other packages. Between consciousness and clarity, we will choose clarity. * *Maintainability and testability:* API choices that require manual maintenance or make testing difficult should be avoided. * *Interoperability with rest of Spark components:* We will keep the R API as thin as possible and keep all functionality implementation in JVM/Scala. * *Being natural to R users:* Ultimate users of this package are R users and they should find it easy and natural to use. The API will follow familiar R function syntax, where the object is passed as the first argument of the method: do_something(obj, arg1, arg2). All functions are snake_case (e.g., {{spark_logistic_regression()}} and {{set_max_iter()}}). If a constructor gets arguments, they will be named arguments. For example: {code:java} > lr <- set_reg_param(set_max_iter(spark.logistic.regression()), 10), 0.1){code} When calls need to be chained, like above example, syntax can nicely translate to a natural pipeline style with help from very popular[ magrittr package|https://cran.r-project.org/web/packages/magrittr/index.html]. For example: {code:java} > logistic_regression() %>% set_max_iter(10) %>% set_reg_param(0.01) -> lr{code} h2. Namespace All new API will be under a new CRAN package, named SparkML. The package should be usable without needing SparkR in the namespace. The package will introduce a number of S4 classes that inherit from four basic classes. Here we will list the basic types with a few examples. An object of any child class can be instantiated with a function call that starts with {{spark_}}. h2. Pipeline & PipelineStage A pipeline object contains one or more stages. {code:java} > pipeline <- spark_pipeline() %>% set_stages(stage1, stage2, stage3){code} Where stage1, stage2, etc are S4 objects of a PipelineStage and pipeline is an object of type Pipeline. h2. Transformers A Transformer is an algorithm that can transform one SparkDataFrame into another SparkDataFrame. *Example API:* {code:java} > tokenizer <- spark_tokenizer() %>% set_input_col(“text”) %>% set_output_col(“words”) > tokenized.df <- tokenizer %>% transform(df) {code} h
[jira] [Commented] (SPARK-24300) generateLDAData in ml.cluster.LDASuite didn't set seed correctly
[ https://issues.apache.org/jira/browse/SPARK-24300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491359#comment-16491359 ] Lu Wang commented on SPARK-24300: - I will fix this issue. > generateLDAData in ml.cluster.LDASuite didn't set seed correctly > > > Key: SPARK-24300 > URL: https://issues.apache.org/jira/browse/SPARK-24300 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.0 >Reporter: Xiangrui Meng >Assignee: Lu Wang >Priority: Minor > > [https://github.com/apache/spark/blob/0d63ebd17df747fb41d7ba254718bb7af3ae/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala] > > generateLDAData uses the same RNG in all partitions to generate random data. > This either causes duplicate rows in cluster mode or indeterministic behavior > in local mode: > {code:java} > scala> val rng = new java.util.Random(10) > rng: java.util.Random = java.util.Random@78c5ef58 > scala> sc.parallelize(1 to 10).map { i => Seq.fill(10)(rng.nextInt(10)) > }.collect().mkString("\n") > res12: String = > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8){code} > We should create one RNG per partition to make it safe. > > cc: [~lu.DB] [~josephkb] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24366) Improve error message for Catalyst type converters
[ https://issues.apache.org/jira/browse/SPARK-24366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-24366. - Resolution: Fixed Assignee: Maxim Gekk Fix Version/s: 2.4.0 > Improve error message for Catalyst type converters > -- > > Key: SPARK-24366 > URL: https://issues.apache.org/jira/browse/SPARK-24366 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.6.3, 2.3.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 2.4.0 > > > User have no way to drill down to understand which of the hundreds of fields > in millions records feeding into the job are causing the problem. We should > to show where in the schema the error is happening. > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 4 in stage 344.0 failed 4 times, most recent failure: Lost task 4.3 in > stage 344.0 (TID 2673, ip-10-31-237-248.ec2.internal): scala.MatchError: > start (of class java.lang.String) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:161) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:161) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$1.apply(CatalystTypeConverters.scala:161) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:161) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) > at > org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) > at > org.apache.spark.sql.ca
[jira] [Commented] (SPARK-23455) Default Params in ML should be saved separately
[ https://issues.apache.org/jira/browse/SPARK-23455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491354#comment-16491354 ] Joseph K. Bradley commented on SPARK-23455: --- Yep, thanks [~viirya] for answering! It will affect R only if we add ways for people to write custom ML Transformers and Estimators in R. > Default Params in ML should be saved separately > --- > > Key: SPARK-23455 > URL: https://issues.apache.org/jira/browse/SPARK-23455 > Project: Spark > Issue Type: Improvement > Components: ML >Affects Versions: 2.4.0 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 2.4.0 > > > We save ML's user-supplied params and default params as one entity in JSON. > During loading the saved models, we set all the loaded params into created ML > model instances as user-supplied params. > It causes some problems, e.g., if we strictly disallow some params to be set > at the same time, a default param can fail the param check because it is > treated as user-supplied param after loading. > The loaded default params should not be set as user-supplied params. We > should save ML default params separately in JSON. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24369) A bug when having multiple distinct aggregations
[ https://issues.apache.org/jira/browse/SPARK-24369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491342#comment-16491342 ] Xiao Li commented on SPARK-24369: - Thanks! > A bug when having multiple distinct aggregations > > > Key: SPARK-24369 > URL: https://issues.apache.org/jira/browse/SPARK-24369 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Priority: Major > > {code} > SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*) FROM > (VALUES >(1, 1), >(2, 2), >(2, 2) > ) t(x, y) > {code} > It returns > {code} > java.lang.RuntimeException > You hit a query analyzer bug. Please report your query to Spark user mailing > list. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24122) Allow automatic driver restarts on K8s
[ https://issues.apache.org/jira/browse/SPARK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491338#comment-16491338 ] Yinan Li commented on SPARK-24122: -- The operator does cover automatic restart of an application with a configurable restart policy. For batch ETL jobs, this is probably sufficient for common needs to restart jobs on failures. For streaming jobs, checkpointing is needed. https://issues.apache.org/jira/browse/SPARK-23980 is also relevant. > Allow automatic driver restarts on K8s > -- > > Key: SPARK-24122 > URL: https://issues.apache.org/jira/browse/SPARK-24122 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Oz Ben-Ami >Priority: Minor > > [~foxish] > Right now SparkSubmit creates the driver as a bare pod, rather than a managed > controller like a Deployment or a StatefulSet. This means there is no way to > guarantee automatic restarts, eg in case a node has an issue. Note Pod > RestartPolicy does not apply if a node fails. A StatefulSet would allow us to > guarantee that, and keep the ability for executors to find the driver using > DNS. > This is particularly helpful for long-running streaming workloads, where we > currently use {{yarn.resourcemanager.am.max-attempts}} with YARN. I can > confirm that Spark Streaming and Structured Streaming applications can be > made to recover from such a restart, with the help of checkpointing. The > executors will have to be started again by the driver, but this should not be > a problem. > For batch processing, we could alternatively use Kubernetes {{Job}} objects, > which restart pods on failure but not success. For example, note the > semantics provided by the {{kubectl run}} > [command|https://kubernetes.io/docs/reference/generated/kubectl/kubectl-commands#run] > * {{--restart=Never}}: bare Pod > * {{--restart=Always}}: Deployment > * {{--restart=OnFailure}}: Job > https://github.com/apache-spark-on-k8s/spark/issues/288 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24395) Fix Behavior of NOT IN with Literals Containing NULL
[ https://issues.apache.org/jira/browse/SPARK-24395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Miles Yucht updated SPARK-24395: Description: Spark does not return the correct answer when evaluating NOT IN in some cases. For example: {code:java} CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES (null, null) AS m(a, b); SELECT * FROM m WHERE a IS NULL AND b IS NULL AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1;{code} According to the semantics of null-aware anti-join, this should return no rows. However, it actually returns the row {{NULL NULL}}. This was found by inspecting the unit tests added for SPARK-24381 ([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).] *Acceptance Criteria*: * We should be able to add the following test cases back to {{subquery/in-subquery/not-in-unit-test-multi-column-literal.sql}}: {code:java} -- Case 3 -- (probe-side columns are all null -> row not returned) SELECT * FROM m WHERE a IS NULL AND b IS NULL -- Matches only (null, null) AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1; -- Case 4 -- (one column null, other column matches a row in the subquery result -> row not returned) SELECT * FROM m WHERE b = 1.0 -- Matches (null, 1.0) AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1; {code} cc [~smilegator] [~juliuszsompolski] was: Spark does not return the correct answer when evaluating NOT IN in some cases. For example: {code:java} CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES (null, null) AS m(a, b); SELECT * FROM m WHERE a IS NULL AND b IS NULL AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1;{code} According to the semantics of null-aware anti-join, this should return no rows. However, it actually returns the row {{NULL NULL}}. This was found by inspecting the unit tests added for SPARK-24381 ([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).] cc [~smilegator] [~juliuszsompolski] > Fix Behavior of NOT IN with Literals Containing NULL > > > Key: SPARK-24395 > URL: https://issues.apache.org/jira/browse/SPARK-24395 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.2 >Reporter: Miles Yucht >Priority: Major > > Spark does not return the correct answer when evaluating NOT IN in some > cases. For example: > {code:java} > CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES > (null, null) > AS m(a, b); > SELECT * > FROM m > WHERE a IS NULL AND b IS NULL >AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, > 1;{code} > According to the semantics of null-aware anti-join, this should return no > rows. However, it actually returns the row {{NULL NULL}}. This was found by > inspecting the unit tests added for SPARK-24381 > ([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).] > *Acceptance Criteria*: > * We should be able to add the following test cases back to > {{subquery/in-subquery/not-in-unit-test-multi-column-literal.sql}}: > {code:java} > -- Case 3 > -- (probe-side columns are all null -> row not returned) > SELECT * > FROM m > WHERE a IS NULL AND b IS NULL -- Matches only (null, null) >AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, > 1; > -- Case 4 > -- (one column null, other column matches a row in the subquery result -> > row not returned) > SELECT * > FROM m > WHERE b = 1.0 -- Matches (null, 1.0) >AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, > 1; > {code} > > cc [~smilegator] [~juliuszsompolski] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24300) generateLDAData in ml.cluster.LDASuite didn't set seed correctly
[ https://issues.apache.org/jira/browse/SPARK-24300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-24300: -- Shepherd: Joseph K. Bradley > generateLDAData in ml.cluster.LDASuite didn't set seed correctly > > > Key: SPARK-24300 > URL: https://issues.apache.org/jira/browse/SPARK-24300 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.3.0 >Reporter: Xiangrui Meng >Assignee: Lu Wang >Priority: Minor > > [https://github.com/apache/spark/blob/0d63ebd17df747fb41d7ba254718bb7af3ae/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala] > > generateLDAData uses the same RNG in all partitions to generate random data. > This either causes duplicate rows in cluster mode or indeterministic behavior > in local mode: > {code:java} > scala> val rng = new java.util.Random(10) > rng: java.util.Random = java.util.Random@78c5ef58 > scala> sc.parallelize(1 to 10).map { i => Seq.fill(10)(rng.nextInt(10)) > }.collect().mkString("\n") > res12: String = > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 0, 3, 0, 6, 6, 7, 8, 1, 4) > List(3, 9, 1, 8, 5, 0, 6, 3, 3, 8){code} > We should create one RNG per partition to make it safe. > > cc: [~lu.DB] [~josephkb] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-6235) Address various 2G limits
[ https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-6235: - Assignee: Marcelo Vanzin > Address various 2G limits > - > > Key: SPARK-6235 > URL: https://issues.apache.org/jira/browse/SPARK-6235 > Project: Spark > Issue Type: Umbrella > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Assignee: Marcelo Vanzin >Priority: Major > Attachments: SPARK-6235_Design_V0.02.pdf > > > An umbrella ticket to track the various 2G limit we have in Spark, due to the > use of byte arrays and ByteBuffers. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-6235) Address various 2G limits
[ https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-6235: - Assignee: (was: Marcelo Vanzin) > Address various 2G limits > - > > Key: SPARK-6235 > URL: https://issues.apache.org/jira/browse/SPARK-6235 > Project: Spark > Issue Type: Umbrella > Components: Shuffle, Spark Core >Reporter: Reynold Xin >Priority: Major > Attachments: SPARK-6235_Design_V0.02.pdf > > > An umbrella ticket to track the various 2G limit we have in Spark, due to the > use of byte arrays and ByteBuffers. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24395) Fix Behavior of NOT IN with Literals Containing NULL
Miles Yucht created SPARK-24395: --- Summary: Fix Behavior of NOT IN with Literals Containing NULL Key: SPARK-24395 URL: https://issues.apache.org/jira/browse/SPARK-24395 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.2 Reporter: Miles Yucht Spark does not return the correct answer when evaluating NOT IN in some cases. For example: {code:java} CREATE TEMPORARY VIEW m AS SELECT * FROM VALUES (null, null) AS m(a, b); SELECT * FROM m WHERE a IS NULL AND b IS NULL AND (a, b) NOT IN ((0, 1.0), (2, 3.0), (4, CAST(null AS DECIMAL(2, 1;{code} According to the semantics of null-aware anti-join, this should return no rows. However, it actually returns the row {{NULL NULL}}. This was found by inspecting the unit tests added for SPARK-24381 ([https://github.com/apache/spark/pull/21425#pullrequestreview-123421822).] cc [~smilegator] [~juliuszsompolski] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin updated SPARK-24392: --- Target Version/s: 2.3.1 > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > Fix For: 2.3.1 > > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491309#comment-16491309 ] Li Jin commented on SPARK-24373: [~smilegator] do you mean that add AnalysisBarrier to RelationalGroupedDataset and KeyValueGroupedDataset could lead to new bugs? > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491305#comment-16491305 ] Bryan Cutler edited comment on SPARK-24392 at 5/25/18 9:53 PM: --- Targeting 2.3.1, will try to resolve today to not hold up the release. was (Author: bryanc): Targeting 2.3.1 > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > Fix For: 2.3.1 > > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491308#comment-16491308 ] Marcelo Vanzin commented on SPARK-24392: (There's a target version field for that, btw. Updating it...) > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > Fix For: 2.3.1 > > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491305#comment-16491305 ] Bryan Cutler commented on SPARK-24392: -- Targeting 2.3.1 > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > Fix For: 2.3.1 > > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Cutler updated SPARK-24392: - Fix Version/s: 2.3.1 > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > Fix For: 2.3.1 > > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491304#comment-16491304 ] Xiao Li commented on SPARK-24373: - In the above example, each time when we re-analyze the plan that is recreated through the Dataset APIs count(), groupBy(), rollup(), cube(), rollup, pivot() and groupByKey(), the Analyzer rule HandleNullInputsForUDF will add the extra IF expression above the UDF in the previously resolved sub-plan. Note, this is not the only rule that could change the analyzed plans if we re-run the analyzer. This is a regression introduced by [https://github.com/apache/spark/pull/17770]. We replaced the original solution (based on the analyzed flag) by the AnalysisBarrier. However, we did not add the AnalysisBarrier on the APIs of RelationalGroupedDataset and KeyValueGroupedDataset. To fix it, we will changes the plan again. We might face some unknown issues. How about adding a temporary flag in Spark 2.3.1? If anything unexpected happens, our users still can change it back to the Spark 2.3.0 behavior? > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apac
[jira] [Created] (SPARK-24394) Nodes in decision tree sometimes have negative impurity values
Barry Becker created SPARK-24394: Summary: Nodes in decision tree sometimes have negative impurity values Key: SPARK-24394 URL: https://issues.apache.org/jira/browse/SPARK-24394 Project: Spark Issue Type: Bug Components: ML Affects Versions: 2.3.0 Environment: Spark 2.3.0 ML linux Reporter: Barry Becker After doing some reading about gini and entropy based impurity (see [https://spark.apache.org/docs/2.2.0/mllib-decision-tree.html]) it seems that impurity values should always be bounded by 0 and 1. However, sometimes some leaf nodes (usually, but not always those with the minimum number of records) have negative impurity values (usually -1, but not always). This seems like bug in the impurity calculation, but I am not sure. This happens for both gini and entropy impurity at slightly different nodes. I can reproduce this with almost any dataset using pretty standard parameters like the following: new DecisionTreeClassifier() .setLabelCol(targetName) .setMaxBins(100) .setMaxDepth(5) .setMinInfoGain(0.01) .setMinInstancesPerNode(5) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point
[ https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491267#comment-16491267 ] Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:27 PM: - Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. Or maybe do what double casting would have spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show returns 1.00 except, I'd be worried about getting nulls when exceeding the range spark.sql("select cast(cast(10 as decimal(2,0)) as decimal(2,1)) as x").show returns null! [https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c] was (Author: hafthor): Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. Or maybe do what double casting would have spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show returns 1.00 spark.sql("select cast(cast(10 as decimal(2,0)) as decimal(2,1)) as x").show returns null! [https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c] > SparkSQL - Decimal data missing decimal point > - > > Key: SPARK-23576 > URL: https://issues.apache.org/jira/browse/SPARK-23576 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: spark 2.3.0 > linux >Reporter: R >Priority: Major > > Integers like 3 stored as a decimal display in sparksql as 300 with > no decimal point. But hive displays fine as 3. > Repro steps: > # Create a .csv with the value 3 > # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC > file > # Use spark to read the ORC, infer the schema (it will infer 38,18 > precision) and output to a Parquet file > # Create external hive table to read the parquet ( define the hive type as > decimal(31,8)) > # Use spark-sql to select from the external hive table. > # Notice how sparksql shows 300 !!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point
[ https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491267#comment-16491267 ] Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:26 PM: - Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. Or maybe do what double casting would have spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show returns 1.00 spark.sql("select cast(cast(10 as decimal(2,0)) as decimal(2,1)) as x").show returns null! [https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c] was (Author: hafthor): Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. Or maybe do what double casting would have spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show return 1.00 [https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c] > SparkSQL - Decimal data missing decimal point > - > > Key: SPARK-23576 > URL: https://issues.apache.org/jira/browse/SPARK-23576 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: spark 2.3.0 > linux >Reporter: R >Priority: Major > > Integers like 3 stored as a decimal display in sparksql as 300 with > no decimal point. But hive displays fine as 3. > Repro steps: > # Create a .csv with the value 3 > # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC > file > # Use spark to read the ORC, infer the schema (it will infer 38,18 > precision) and output to a Parquet file > # Create external hive table to read the parquet ( define the hive type as > decimal(31,8)) > # Use spark-sql to select from the external hive table. > # Notice how sparksql shows 300 !!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24093) Make some fields of KafkaStreamWriter/InternalRowMicroBatchWriter visible to outside of the classes
[ https://issues.apache.org/jira/browse/SPARK-24093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491285#comment-16491285 ] Mingjie Tang commented on SPARK-24093: -- i can add a PR for this. > Make some fields of KafkaStreamWriter/InternalRowMicroBatchWriter visible to > outside of the classes > --- > > Key: SPARK-24093 > URL: https://issues.apache.org/jira/browse/SPARK-24093 > Project: Spark > Issue Type: Wish > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Weiqing Yang >Priority: Minor > > To make third parties able to get the information of streaming writer, for > example, the information of "writer" and "topic" which streaming data are > written into, this jira is created to make relevant fields of > KafkaStreamWriter and InternalRowMicroBatchWriter visible to outside of the > classes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23887) update query progress
[ https://issues.apache.org/jira/browse/SPARK-23887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491281#comment-16491281 ] Arun Mahadevan commented on SPARK-23887: We could probably invoke "ProgressReporter.finishTrigger" after each epoch. This would update the query execution stats and post StreamingQueryListener events. However I am not clear on how the SQL metrics could be updated since it relies on accumulator and the accumulators might not be updated unless the task completes. We could probably post some special event to the DAGScheduler to update the accumulators or need to figure out some other mechanisms. [~joseph.torres] [~tdas] , what do you think? > update query progress > - > > Key: SPARK-23887 > URL: https://issues.apache.org/jira/browse/SPARK-23887 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24091) Internally used ConfigMap prevents use of user-specified ConfigMaps carrying Spark configs files
[ https://issues.apache.org/jira/browse/SPARK-24091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491279#comment-16491279 ] Yinan Li commented on SPARK-24091: -- Thanks [~tmckay]! I think the first approach is a good way of handling override and customization. > Internally used ConfigMap prevents use of user-specified ConfigMaps carrying > Spark configs files > > > Key: SPARK-24091 > URL: https://issues.apache.org/jira/browse/SPARK-24091 > Project: Spark > Issue Type: Brainstorming > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Yinan Li >Priority: Major > > The recent PR [https://github.com/apache/spark/pull/20669] for removing the > init-container introduced a internally used ConfigMap carrying Spark > configuration properties in a file for the driver. This ConfigMap gets > mounted under {{$SPARK_HOME/conf}} and the environment variable > {{SPARK_CONF_DIR}} is set to point to the mount path. This pretty much > prevents users from mounting their own ConfigMaps that carry custom Spark > configuration files, e.g., {{log4j.properties}} and {{spark-env.sh}} and > leaves users with only the option of building custom images. IMO, it is very > useful to support mounting user-specified ConfigMaps for custom Spark > configuration files. This worths further discussions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point
[ https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491267#comment-16491267 ] Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:14 PM: - Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. Or maybe do what double casting would have spark.sql("select cast(cast(1 as decimal(38,10)) as decimal(38,18)) as x").show return 1.00 [https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c] was (Author: hafthor): Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c > SparkSQL - Decimal data missing decimal point > - > > Key: SPARK-23576 > URL: https://issues.apache.org/jira/browse/SPARK-23576 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: spark 2.3.0 > linux >Reporter: R >Priority: Major > > Integers like 3 stored as a decimal display in sparksql as 300 with > no decimal point. But hive displays fine as 3. > Repro steps: > # Create a .csv with the value 3 > # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC > file > # Use spark to read the ORC, infer the schema (it will infer 38,18 > precision) and output to a Parquet file > # Create external hive table to read the parquet ( define the hive type as > decimal(31,8)) > # Use spark-sql to select from the external hive table. > # Notice how sparksql shows 300 !!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491271#comment-16491271 ] Marco Gaido commented on SPARK-24373: - [~smilegator] yes, you're right, the impact would be definitely lower. > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23576) SparkSQL - Decimal data missing decimal point
[ https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491267#comment-16491267 ] Hafthor Stefansson edited comment on SPARK-23576 at 5/25/18 9:06 PM: - Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. https://gist.github.com/Hafthor/7f12bdfc41dc96676df03f366ef76f1c was (Author: hafthor): Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. > SparkSQL - Decimal data missing decimal point > - > > Key: SPARK-23576 > URL: https://issues.apache.org/jira/browse/SPARK-23576 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: spark 2.3.0 > linux >Reporter: R >Priority: Major > > Integers like 3 stored as a decimal display in sparksql as 300 with > no decimal point. But hive displays fine as 3. > Repro steps: > # Create a .csv with the value 3 > # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC > file > # Use spark to read the ORC, infer the schema (it will infer 38,18 > precision) and output to a Parquet file > # Create external hive table to read the parquet ( define the hive type as > decimal(31,8)) > # Use spark-sql to select from the external hive table. > # Notice how sparksql shows 300 !!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23576) SparkSQL - Decimal data missing decimal point
[ https://issues.apache.org/jira/browse/SPARK-23576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491267#comment-16491267 ] Hafthor Stefansson commented on SPARK-23576: Here's an equivalent problem: spark.sql("select cast(1 as decimal(38,18)) as x").write.format("parquet").save("decimal.parq") spark.read.schema(spark.sql("select cast(1 as decimal) as x").schema).parquet("decimal.parq").show returns 100! It should throw, like it would if I specified a schema with x as float, or some other type. > SparkSQL - Decimal data missing decimal point > - > > Key: SPARK-23576 > URL: https://issues.apache.org/jira/browse/SPARK-23576 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 > Environment: spark 2.3.0 > linux >Reporter: R >Priority: Major > > Integers like 3 stored as a decimal display in sparksql as 300 with > no decimal point. But hive displays fine as 3. > Repro steps: > # Create a .csv with the value 3 > # Use spark to read the csv, cast it as decimal(31,8) and output to an ORC > file > # Use spark to read the ORC, infer the schema (it will infer 38,18 > precision) and output to a Parquet file > # Create external hive table to read the parquet ( define the hive type as > decimal(31,8)) > # Use spark-sql to select from the external hive table. > # Notice how sparksql shows 300 !!! > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24373: Summary: "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans (was: "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after rerunning the analyzer) > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after re-analyzing the plans > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23309) Spark 2.3 cached query performance 20-30% worse then spark 2.2
[ https://issues.apache.org/jira/browse/SPARK-23309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491233#comment-16491233 ] Xiao Li commented on SPARK-23309: - [~vanzin] https://issues.apache.org/jira/browse/SPARK-24373 is not related to this JIRA. This JIRA uses a pure SQL and thus it will not hit the problem caused by AnalysisBarrier. > Spark 2.3 cached query performance 20-30% worse then spark 2.2 > -- > > Key: SPARK-23309 > URL: https://issues.apache.org/jira/browse/SPARK-23309 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Thomas Graves >Priority: Major > > I was testing spark 2.3 rc2 and I am seeing a performance regression in sql > queries on cached data. > The size of the data: 10.4GB input from hive orc files /18.8 GB cached/5592 > partitions > Here is the example query: > val dailycached = spark.sql("select something from table where dt = > '20170301' AND something IS NOT NULL") > dailycached.createOrReplaceTempView("dailycached") > spark.catalog.cacheTable("dailyCached") > spark.sql("SELECT COUNT(DISTINCT(something)) from dailycached").show() > > On spark 2.2 I see queries times average 13 seconds > On the same nodes I see spark 2.3 queries times average 17 seconds > Note these are times of queries after the initial caching. so just running > the last line again: > spark.sql("SELECT COUNT(DISTINCT(something)) from dailycached").show() > multiple times. > > I also ran a query over more data (335GB input/587.5 GB cached) and saw a > similar discrepancy in the performance of querying cached data between spark > 2.3 and spark 2.2, where 2.2 was better by like 20%. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after rerunning the analyzer
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24373: Summary: "df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after rerunning the analyzer (was: "df.cache() df.count()" no longer eagerly caches data) > "df.cache() df.count()" no longer eagerly caches data when the analyzed plans > are different after rerunning the analyzer > > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491224#comment-16491224 ] Xiao Li edited comment on SPARK-24373 at 5/25/18 8:24 PM: -- {code} def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) } {code} Many Spark users are using df.count() after df.cache() for achieving eager caching. Since our count() API is using `groupBy()`, the impact becomes much bigger. The count() API will not trigger the data materialization when the plans are different after multiple rounds of plan analysis. was (Author: smilegator): {code} def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) } {code} Many Spark users are using df.count() after df.cache() for achieving eager caching. Since our count() API is using `groupBy()`, the impact becomes much bigger if the plans are different after multiple rounds of plan analysis. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491224#comment-16491224 ] Xiao Li edited comment on SPARK-24373 at 5/25/18 8:23 PM: -- {code} def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) } {code} Many Spark users are using df.count() after df.cache() for achieving eager caching. Since our count() API is using `groupBy()`, the impact becomes much bigger if the plans are different after multiple rounds of plan analysis. was (Author: smilegator): {code} def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) } {code} Many Spark users are using df.count() after df.cache() for achieving eager caching. Since our count() API is using `groupBy()`, the impact becomes much bigger. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24373: Target Version/s: 2.3.1 > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li updated SPARK-24373: Priority: Blocker (was: Major) > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Blocker > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24004) Tests of from_json for MapType
[ https://issues.apache.org/jira/browse/SPARK-24004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Gekk resolved SPARK-24004. Resolution: Won't Fix > Tests of from_json for MapType > -- > > Key: SPARK-24004 > URL: https://issues.apache.org/jira/browse/SPARK-24004 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 2.3.0 >Reporter: Maxim Gekk >Priority: Trivial > > There are no tests for *from_json* that check *MapType* as a value type of > struct fields. The MapType should be supported as non-root type according to > current implementation of JacksonParser but the functionality is not checked. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491224#comment-16491224 ] Xiao Li commented on SPARK-24373: - {code} def count(): Long = withAction("count", groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) } {code} Many Spark users are using df.count() after df.cache() for achieving eager caching. Since our count() API is using `groupBy()`, the impact becomes much bigger. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-15125) CSV data source recognizes empty quoted strings in the input as null.
[ https://issues.apache.org/jira/browse/SPARK-15125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Gekk resolved SPARK-15125. Resolution: Fixed Fix Version/s: 2.4.0 The issue has been fixed by https://github.com/apache/spark/commit/7a2d4895c75d4c232c377876b61c05a083eab3c8 > CSV data source recognizes empty quoted strings in the input as null. > -- > > Key: SPARK-15125 > URL: https://issues.apache.org/jira/browse/SPARK-15125 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Suresh Thalamati >Priority: Major > Fix For: 2.4.0 > > > CSV data source does not differentiate between empty quoted strings and empty > fields as null. In some scenarios user would want to differentiate between > these values, especially in the context of SQL where NULL , and empty string > have different meanings If input data happens to be dump from traditional > relational data source, users will see different results for the SQL queries. > {code} > Repro: > Test Data: (test.csv) > year,make,model,comment,price > 2017,Tesla,Mode 3,looks nice.,35000.99 > 2016,Chevy,Bolt,"",29000.00 > 2015,Porsche,"",, > scala> val df= sqlContext.read.format("csv").option("header", > "true").option("inferSchema", "true").option("nullValue", > null).load("/tmp/test.csv") > df: org.apache.spark.sql.DataFrame = [year: int, make: string ... 3 more > fields] > scala> df.show > ++---+--+---++ > |year| make| model|comment| price| > ++---+--+---++ > |2017| Tesla|Mode 3|looks nice.|35000.99| > |2016| Chevy| Bolt| null| 29000.0| > |2015|Porsche| null| null|null| > ++---+--+---++ > Expected: > ++---+--+---++ > |year| make| model|comment| price| > ++---+--+---++ > |2017| Tesla|Mode 3|looks nice.|35000.99| > |2016| Chevy| Bolt| | 29000.0| > |2015|Porsche| | null|null| > ++---+--+---++ > {code} > Testing a fix for the this issue. I will give a shot at submitting a PR for > this soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24393) SQL builtin: isinf
Henry Robinson created SPARK-24393: -- Summary: SQL builtin: isinf Key: SPARK-24393 URL: https://issues.apache.org/jira/browse/SPARK-24393 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 2.4.0 Reporter: Henry Robinson Along with the existing {{isnan}}, it would be helpful to have {{isinf}} to test if a float or double value is {{Infinity}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491153#comment-16491153 ] Tomasz Gawęda commented on SPARK-24373: --- [~LI,Xiao] That is a good idea :) Eager caching is useful, many times I see additional count just to cache eagerly > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491124#comment-16491124 ] Marco Gaido commented on SPARK-24373: - [~smilegator] I think an eager API is not related to the problem experienced here, though. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24324) Pandas Grouped Map UserDefinedFunction mixes column labels
[ https://issues.apache.org/jira/browse/SPARK-24324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491125#comment-16491125 ] Li Jin commented on SPARK-24324: Moved under Spark-22216 for better ticket organization. > Pandas Grouped Map UserDefinedFunction mixes column labels > -- > > Key: SPARK-24324 > URL: https://issues.apache.org/jira/browse/SPARK-24324 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.0 > Environment: Python (using virtualenv): > {noformat} > $ python --version > Python 3.6.5 > {noformat} > Modules installed: > {noformat} > arrow==0.12.1 > backcall==0.1.0 > bleach==2.1.3 > chardet==3.0.4 > decorator==4.3.0 > entrypoints==0.2.3 > findspark==1.2.0 > html5lib==1.0.1 > ipdb==0.11 > ipykernel==4.8.2 > ipython==6.3.1 > ipython-genutils==0.2.0 > ipywidgets==7.2.1 > jedi==0.12.0 > Jinja2==2.10 > jsonschema==2.6.0 > jupyter==1.0.0 > jupyter-client==5.2.3 > jupyter-console==5.2.0 > jupyter-core==4.4.0 > MarkupSafe==1.0 > mistune==0.8.3 > nbconvert==5.3.1 > nbformat==4.4.0 > notebook==5.5.0 > numpy==1.14.3 > pandas==0.22.0 > pandocfilters==1.4.2 > parso==0.2.0 > pbr==3.1.1 > pexpect==4.5.0 > pickleshare==0.7.4 > progressbar2==3.37.1 > prompt-toolkit==1.0.15 > ptyprocess==0.5.2 > pyarrow==0.9.0 > Pygments==2.2.0 > python-dateutil==2.7.2 > python-utils==2.3.0 > pytz==2018.4 > pyzmq==17.0.0 > qtconsole==4.3.1 > Send2Trash==1.5.0 > simplegeneric==0.8.1 > six==1.11.0 > SQLAlchemy==1.2.7 > stevedore==1.28.0 > terminado==0.8.1 > testpath==0.3.1 > tornado==5.0.2 > traitlets==4.3.2 > virtualenv==15.1.0 > virtualenv-clone==0.2.6 > virtualenvwrapper==4.7.2 > wcwidth==0.1.7 > webencodings==0.5.1 > widgetsnbextension==3.2.1 > {noformat} > > Java: > {noformat} > $ java -version > java version "1.8.0_171" > Java(TM) SE Runtime Environment (build 1.8.0_171-b11) > Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat} > System: > {noformat} > $ lsb_release -a > No LSB modules are available. > Distributor ID: Ubuntu > Description: Ubuntu 16.04.4 LTS > Release: 16.04 > Codename: xenial > {noformat} >Reporter: Cristian Consonni >Priority: Major > > I am working on Wikipedia page views (see [task T188041 on Wikimedia's > Pharicator|https://phabricator.wikimedia.org/T188041]). For simplicity, let's > say that these are the data: > {noformat} > > {noformat} > For each combination of (lang, page, day(timestamp)) I need to transform the > views for each hour: > {noformat} > 00:00 -> A > 01:00 -> B > ... > {noformat} > and concatenate the number of views for that hour. So, if a page got 5 views > at 00:00 and 7 views at 01:00 it would become: > {noformat} > A5B7 > {noformat} > > I have written a UDF called {code:python}concat_hours{code} > However, the function is mixing the columns and I am not sure what is going > on. I wrote here a minimal complete example that reproduces the issue on my > system (the details of my environment are above). > {code:python} > #!/usr/bin/env python3 > # coding: utf-8 > input_data = b"""en Albert_Camus 20071210-00 150 > en Albert_Camus 20071210-01 148 > en Albert_Camus 20071210-02 197 > en Albert_Camus 20071211-20 145 > en Albert_Camus 20071211-21 131 > en Albert_Camus 20071211-22 154 > en Albert_Camus 20071211-230001 142 > en Albert_Caquot 20071210-02 1 > en Albert_Caquot 20071210-02 1 > en Albert_Caquot 20071210-040001 1 > en Albert_Caquot 20071211-06 1 > en Albert_Caquot 20071211-08 1 > en Albert_Caquot 20071211-15 3 > en Albert_Caquot 20071211-21 1""" > import tempfile > fp = tempfile.NamedTemporaryFile() > fp.write(input_data) > fp.seek(0) > import findspark > findspark.init() > import pyspark > from pyspark.sql.types import StructType, StructField > from pyspark.sql.types import StringType, IntegerType, TimestampType > from pyspark.sql import functions > sc = pyspark.SparkContext(appName="udf_example") > sqlctx = pyspark.SQLContext(sc) > schema = StructType([StructField("lang", StringType(), False), > StructField("page", StringType(), False), > StructField("timestamp", TimestampType(), False), > StructField("views", IntegerType(), False)]) > df = sqlctx.read.csv(fp.name, > header=False, > schema=schema, > timestampFormat="MMdd-HHmmss", > sep=' ') > df.count() > df.dtypes > df.show() > new_schema = StructType([StructField("lang", StringType(), False), > StructField("page", StringType(), False), > StructField("day", StringType(), False), > StructField("enc", StringType(), Fals
[jira] [Updated] (SPARK-24324) Pandas Grouped Map UserDefinedFunction mixes column labels
[ https://issues.apache.org/jira/browse/SPARK-24324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Jin updated SPARK-24324: --- Issue Type: Sub-task (was: Bug) Parent: SPARK-22216 > Pandas Grouped Map UserDefinedFunction mixes column labels > -- > > Key: SPARK-24324 > URL: https://issues.apache.org/jira/browse/SPARK-24324 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.0 > Environment: Python (using virtualenv): > {noformat} > $ python --version > Python 3.6.5 > {noformat} > Modules installed: > {noformat} > arrow==0.12.1 > backcall==0.1.0 > bleach==2.1.3 > chardet==3.0.4 > decorator==4.3.0 > entrypoints==0.2.3 > findspark==1.2.0 > html5lib==1.0.1 > ipdb==0.11 > ipykernel==4.8.2 > ipython==6.3.1 > ipython-genutils==0.2.0 > ipywidgets==7.2.1 > jedi==0.12.0 > Jinja2==2.10 > jsonschema==2.6.0 > jupyter==1.0.0 > jupyter-client==5.2.3 > jupyter-console==5.2.0 > jupyter-core==4.4.0 > MarkupSafe==1.0 > mistune==0.8.3 > nbconvert==5.3.1 > nbformat==4.4.0 > notebook==5.5.0 > numpy==1.14.3 > pandas==0.22.0 > pandocfilters==1.4.2 > parso==0.2.0 > pbr==3.1.1 > pexpect==4.5.0 > pickleshare==0.7.4 > progressbar2==3.37.1 > prompt-toolkit==1.0.15 > ptyprocess==0.5.2 > pyarrow==0.9.0 > Pygments==2.2.0 > python-dateutil==2.7.2 > python-utils==2.3.0 > pytz==2018.4 > pyzmq==17.0.0 > qtconsole==4.3.1 > Send2Trash==1.5.0 > simplegeneric==0.8.1 > six==1.11.0 > SQLAlchemy==1.2.7 > stevedore==1.28.0 > terminado==0.8.1 > testpath==0.3.1 > tornado==5.0.2 > traitlets==4.3.2 > virtualenv==15.1.0 > virtualenv-clone==0.2.6 > virtualenvwrapper==4.7.2 > wcwidth==0.1.7 > webencodings==0.5.1 > widgetsnbextension==3.2.1 > {noformat} > > Java: > {noformat} > $ java -version > java version "1.8.0_171" > Java(TM) SE Runtime Environment (build 1.8.0_171-b11) > Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat} > System: > {noformat} > $ lsb_release -a > No LSB modules are available. > Distributor ID: Ubuntu > Description: Ubuntu 16.04.4 LTS > Release: 16.04 > Codename: xenial > {noformat} >Reporter: Cristian Consonni >Priority: Major > > I am working on Wikipedia page views (see [task T188041 on Wikimedia's > Pharicator|https://phabricator.wikimedia.org/T188041]). For simplicity, let's > say that these are the data: > {noformat} > > {noformat} > For each combination of (lang, page, day(timestamp)) I need to transform the > views for each hour: > {noformat} > 00:00 -> A > 01:00 -> B > ... > {noformat} > and concatenate the number of views for that hour. So, if a page got 5 views > at 00:00 and 7 views at 01:00 it would become: > {noformat} > A5B7 > {noformat} > > I have written a UDF called {code:python}concat_hours{code} > However, the function is mixing the columns and I am not sure what is going > on. I wrote here a minimal complete example that reproduces the issue on my > system (the details of my environment are above). > {code:python} > #!/usr/bin/env python3 > # coding: utf-8 > input_data = b"""en Albert_Camus 20071210-00 150 > en Albert_Camus 20071210-01 148 > en Albert_Camus 20071210-02 197 > en Albert_Camus 20071211-20 145 > en Albert_Camus 20071211-21 131 > en Albert_Camus 20071211-22 154 > en Albert_Camus 20071211-230001 142 > en Albert_Caquot 20071210-02 1 > en Albert_Caquot 20071210-02 1 > en Albert_Caquot 20071210-040001 1 > en Albert_Caquot 20071211-06 1 > en Albert_Caquot 20071211-08 1 > en Albert_Caquot 20071211-15 3 > en Albert_Caquot 20071211-21 1""" > import tempfile > fp = tempfile.NamedTemporaryFile() > fp.write(input_data) > fp.seek(0) > import findspark > findspark.init() > import pyspark > from pyspark.sql.types import StructType, StructField > from pyspark.sql.types import StringType, IntegerType, TimestampType > from pyspark.sql import functions > sc = pyspark.SparkContext(appName="udf_example") > sqlctx = pyspark.SQLContext(sc) > schema = StructType([StructField("lang", StringType(), False), > StructField("page", StringType(), False), > StructField("timestamp", TimestampType(), False), > StructField("views", IntegerType(), False)]) > df = sqlctx.read.csv(fp.name, > header=False, > schema=schema, > timestampFormat="MMdd-HHmmss", > sep=' ') > df.count() > df.dtypes > df.show() > new_schema = StructType([StructField("lang", StringType(), False), > StructField("page", StringType(), False), > StructField("day", StringType(), False), > StructField("enc", StringType(), False)]) > from pyspark.sql.functions import pandas_u
[jira] [Updated] (SPARK-22809) pyspark is sensitive to imports with dots
[ https://issues.apache.org/jira/browse/SPARK-22809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin updated SPARK-22809: --- Target Version/s: 2.4.0 (was: 2.3.1, 2.4.0) > pyspark is sensitive to imports with dots > - > > Key: SPARK-22809 > URL: https://issues.apache.org/jira/browse/SPARK-22809 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0, 2.2.1 >Reporter: Cricket Temple >Assignee: holdenk >Priority: Major > > User code can fail with dotted imports. Here's a repro script. > {noformat} > import numpy as np > import pandas as pd > import pyspark > import scipy.interpolate > import scipy.interpolate as scipy_interpolate > import py4j > scipy_interpolate2 = scipy.interpolate > sc = pyspark.SparkContext() > spark_session = pyspark.SQLContext(sc) > ### > # The details of this dataset are irrelevant # > # Sorry if you'd have preferred something more boring # > ### > x__ = np.linspace(0,10,1000) > freq__ = np.arange(1,5) > x_, freq_ = np.ix_(x__, freq__) > y = np.sin(x_ * freq_).ravel() > x = (x_ * np.ones(freq_.shape)).ravel() > freq = (np.ones(x_.shape) * freq_).ravel() > df_pd = pd.DataFrame(np.stack([x,y,freq]).T, columns=['x','y','freq']) > df_sk = spark_session.createDataFrame(df_pd) > assert(df_sk.toPandas() == df_pd).all().all() > try: > import matplotlib.pyplot as plt > for f, data in df_pd.groupby("freq"): > plt.plot(*data[['x','y']].values.T) > plt.show() > except: > print("I guess we can't plot anything") > def mymap(x, interp_fn): > df = pd.DataFrame.from_records([row.asDict() for row in list(x)]) > return interp_fn(df.x.values, df.y.values)(np.pi) > df_by_freq = df_sk.rdd.keyBy(lambda x: x.freq).groupByKey() > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy_interpolate.interp1d)).collect() > assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), > atol=1e-6)) > try: > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy.interpolate.interp1d)).collect() > raise Excpetion("Not going to reach this line") > except py4j.protocol.Py4JJavaError, e: > print("See?") > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy_interpolate2.interp1d)).collect() > assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), > atol=1e-6)) > # But now it works! > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy.interpolate.interp1d)).collect() > assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), > atol=1e-6)) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24383) spark on k8s: "driver-svc" are not getting deleted
[ https://issues.apache.org/jira/browse/SPARK-24383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491106#comment-16491106 ] Yinan Li commented on SPARK-24383: -- OK, then garbage collection should kick in and delete the service when the driver pod is gone unless there's some issue with the GC. > spark on k8s: "driver-svc" are not getting deleted > -- > > Key: SPARK-24383 > URL: https://issues.apache.org/jira/browse/SPARK-24383 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Lenin >Priority: Major > > When the driver pod exists, the "*driver-svc" services created for the driver > are not cleaned up. This causes accumulation of services in the k8s layer, at > one point no more services can be created. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22809) pyspark is sensitive to imports with dots
[ https://issues.apache.org/jira/browse/SPARK-22809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491107#comment-16491107 ] Marcelo Vanzin commented on SPARK-22809: I'm removing 2.3.1 since it doesn't seem there's any activity here. Please re-add if you plan to work on this for that release. > pyspark is sensitive to imports with dots > - > > Key: SPARK-22809 > URL: https://issues.apache.org/jira/browse/SPARK-22809 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0, 2.2.1 >Reporter: Cricket Temple >Assignee: holdenk >Priority: Major > > User code can fail with dotted imports. Here's a repro script. > {noformat} > import numpy as np > import pandas as pd > import pyspark > import scipy.interpolate > import scipy.interpolate as scipy_interpolate > import py4j > scipy_interpolate2 = scipy.interpolate > sc = pyspark.SparkContext() > spark_session = pyspark.SQLContext(sc) > ### > # The details of this dataset are irrelevant # > # Sorry if you'd have preferred something more boring # > ### > x__ = np.linspace(0,10,1000) > freq__ = np.arange(1,5) > x_, freq_ = np.ix_(x__, freq__) > y = np.sin(x_ * freq_).ravel() > x = (x_ * np.ones(freq_.shape)).ravel() > freq = (np.ones(x_.shape) * freq_).ravel() > df_pd = pd.DataFrame(np.stack([x,y,freq]).T, columns=['x','y','freq']) > df_sk = spark_session.createDataFrame(df_pd) > assert(df_sk.toPandas() == df_pd).all().all() > try: > import matplotlib.pyplot as plt > for f, data in df_pd.groupby("freq"): > plt.plot(*data[['x','y']].values.T) > plt.show() > except: > print("I guess we can't plot anything") > def mymap(x, interp_fn): > df = pd.DataFrame.from_records([row.asDict() for row in list(x)]) > return interp_fn(df.x.values, df.y.values)(np.pi) > df_by_freq = df_sk.rdd.keyBy(lambda x: x.freq).groupByKey() > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy_interpolate.interp1d)).collect() > assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), > atol=1e-6)) > try: > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy.interpolate.interp1d)).collect() > raise Excpetion("Not going to reach this line") > except py4j.protocol.Py4JJavaError, e: > print("See?") > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy_interpolate2.interp1d)).collect() > assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), > atol=1e-6)) > # But now it works! > result = df_by_freq.mapValues(lambda x: mymap(x, > scipy.interpolate.interp1d)).collect() > assert(np.allclose(np.array(zip(*result)[1]), np.zeros(len(freq__)), > atol=1e-6)) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491099#comment-16491099 ] Marcelo Vanzin commented on SPARK-24392: What release is this supposed to block? > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491088#comment-16491088 ] Xiao Li commented on SPARK-24373: - BTW, I plan to continue my work of https://github.com/apache/spark/pull/18717, which will add an eager persist/cache API. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491081#comment-16491081 ] Xiao Li edited comment on SPARK-24373 at 5/25/18 5:57 PM: -- [~icexelloss] [~aweise] Are you also using the Dataset APIs groupBy(), rollup(), cube(), rollup, pivot() and groupByKey()? was (Author: smilegator): [~icexelloss] [~aweise] Are you also using the Dataset APIs groupBy(), rollup(), cube(), rollup, pivot()? > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491086#comment-16491086 ] Li Jin commented on SPARK-24373: We use groupby() and pivot() > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491081#comment-16491081 ] Xiao Li commented on SPARK-24373: - [~icexelloss] [~aweise] Are you also using the Dataset APIs groupBy(), rollup(), cube(), rollup, pivot()? > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24392: Assignee: Apache Spark > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Assignee: Apache Spark >Priority: Blocker > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491042#comment-16491042 ] Apache Spark commented on SPARK-24392: -- User 'BryanCutler' has created a pull request for this issue: https://github.com/apache/spark/pull/21435 > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24392: Assignee: (was: Apache Spark) > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries
[ https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490996#comment-16490996 ] Apache Spark commented on SPARK-24331: -- User 'mn-mikke' has created a pull request for this issue: https://github.com/apache/spark/pull/21434 > Add arrays_overlap / array_repeat / map_entries > - > > Key: SPARK-24331 > URL: https://issues.apache.org/jira/browse/SPARK-24331 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.4.0 >Reporter: Marek Novotny >Priority: Major > > Add SparkR equivalent to: > * arrays_overlap - SPARK-23922 > * array_repeat - SPARK-23925 > * map_entries - SPARK-23935 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries
[ https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24331: Assignee: Apache Spark > Add arrays_overlap / array_repeat / map_entries > - > > Key: SPARK-24331 > URL: https://issues.apache.org/jira/browse/SPARK-24331 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.4.0 >Reporter: Marek Novotny >Assignee: Apache Spark >Priority: Major > > Add SparkR equivalent to: > * arrays_overlap - SPARK-23922 > * array_repeat - SPARK-23925 > * map_entries - SPARK-23935 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries
[ https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24331: Assignee: (was: Apache Spark) > Add arrays_overlap / array_repeat / map_entries > - > > Key: SPARK-24331 > URL: https://issues.apache.org/jira/browse/SPARK-24331 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.4.0 >Reporter: Marek Novotny >Priority: Major > > Add SparkR equivalent to: > * arrays_overlap - SPARK-23922 > * array_repeat - SPARK-23925 > * map_entries - SPARK-23935 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24392) Mark pandas_udf as Experimental
[ https://issues.apache.org/jira/browse/SPARK-24392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Cutler updated SPARK-24392: - Priority: Blocker (was: Critical) > Mark pandas_udf as Experimental > --- > > Key: SPARK-24392 > URL: https://issues.apache.org/jira/browse/SPARK-24392 > Project: Spark > Issue Type: Task > Components: PySpark >Affects Versions: 2.3.0 >Reporter: Bryan Cutler >Priority: Blocker > > This functionality is still evolving and has introduced some bugs . It was > an oversight to not mark it as experimental before it was released in 2.3.0. > Not sure if it is a good idea to change this after the fact, but I'm opening > this to continue discussion from > https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24392) Mark pandas_udf as Experimental
Bryan Cutler created SPARK-24392: Summary: Mark pandas_udf as Experimental Key: SPARK-24392 URL: https://issues.apache.org/jira/browse/SPARK-24392 Project: Spark Issue Type: Task Components: PySpark Affects Versions: 2.3.0 Reporter: Bryan Cutler This functionality is still evolving and has introduced some bugs . It was an oversight to not mark it as experimental before it was released in 2.3.0. Not sure if it is a good idea to change this after the fact, but I'm opening this to continue discussion from https://github.com/apache/spark/pull/21427#issuecomment-391967423 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24391) to_json/from_json should support arrays of primitives, and more generally all JSON
Sam Kitajima-Kimbrel created SPARK-24391: Summary: to_json/from_json should support arrays of primitives, and more generally all JSON Key: SPARK-24391 URL: https://issues.apache.org/jira/browse/SPARK-24391 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Sam Kitajima-Kimbrel https://issues.apache.org/jira/browse/SPARK-19849 and https://issues.apache.org/jira/browse/SPARK-21513 brought support for more column types to functions.to_json/from_json, but I also have cases where I'd like to simply (de)serialize an array of primitives to/from JSON when outputting to certain destinations, which does not work: {code:java} scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import spark.implicits._ import spark.implicits._ scala> val df = Seq("[1, 2, 3]").toDF("a") df: org.apache.spark.sql.DataFrame = [a: string] scala> val schema = new ArrayType(IntegerType, false) schema: org.apache.spark.sql.types.ArrayType = ArrayType(IntegerType,false) scala> df.select(from_json($"a", schema)) org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`a`)' due to data type mismatch: Input schema array must be a struct or an array of structs.;; 'Project [jsontostructs(ArrayType(IntegerType,false), a#3, Some(America/Los_Angeles)) AS jsontostructs(a)#10] scala> val arrayDf = Seq(Array(1, 2, 3)).toDF("arr") arrayDf: org.apache.spark.sql.DataFrame = [arr: array] scala> arrayDf.select(to_json($"arr")) org.apache.spark.sql.AnalysisException: cannot resolve 'structstojson(`arr`)' due to data type mismatch: Input type array must be a struct, array of structs or a map or array of map.;; 'Project [structstojson(arr#19, Some(America/Los_Angeles)) AS structstojson(arr)#26] {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23820) Allow the long form of call sites to be recorded in the log
[ https://issues.apache.org/jira/browse/SPARK-23820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23820: Assignee: Apache Spark > Allow the long form of call sites to be recorded in the log > --- > > Key: SPARK-23820 > URL: https://issues.apache.org/jira/browse/SPARK-23820 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Michael Mior >Assignee: Apache Spark >Priority: Major > > It would be nice if the long form of the callsite information could be > included in the log. An example of what I'm proposing is here: > https://github.com/michaelmior/spark/commit/4b4076cfb1d51ceb20fd2b0a3b1b5be2aebb6416 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23820) Allow the long form of call sites to be recorded in the log
[ https://issues.apache.org/jira/browse/SPARK-23820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23820: Assignee: (was: Apache Spark) > Allow the long form of call sites to be recorded in the log > --- > > Key: SPARK-23820 > URL: https://issues.apache.org/jira/browse/SPARK-23820 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Michael Mior >Priority: Major > > It would be nice if the long form of the callsite information could be > included in the log. An example of what I'm proposing is here: > https://github.com/michaelmior/spark/commit/4b4076cfb1d51ceb20fd2b0a3b1b5be2aebb6416 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23820) Allow the long form of call sites to be recorded in the log
[ https://issues.apache.org/jira/browse/SPARK-23820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490905#comment-16490905 ] Apache Spark commented on SPARK-23820: -- User 'michaelmior' has created a pull request for this issue: https://github.com/apache/spark/pull/21433 > Allow the long form of call sites to be recorded in the log > --- > > Key: SPARK-23820 > URL: https://issues.apache.org/jira/browse/SPARK-23820 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Michael Mior >Priority: Major > > It would be nice if the long form of the callsite information could be > included in the log. An example of what I'm proposing is here: > https://github.com/michaelmior/spark/commit/4b4076cfb1d51ceb20fd2b0a3b1b5be2aebb6416 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries
[ https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marek Novotny updated SPARK-24331: -- Description: Add SparkR equivalent to: * arrays_overlap - SPARK-23922 * array_repeat - SPARK-23925 * map_entries - SPARK-23935 was: Add SparkR equivalent to: * cardinality - SPARK-23923 * arrays_overlap - SPARK-23922 * array_repeat - SPARK-23925 * map_entries - SPARK-23935 > Add arrays_overlap / array_repeat / map_entries > - > > Key: SPARK-24331 > URL: https://issues.apache.org/jira/browse/SPARK-24331 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.4.0 >Reporter: Marek Novotny >Priority: Major > > Add SparkR equivalent to: > * arrays_overlap - SPARK-23922 > * array_repeat - SPARK-23925 > * map_entries - SPARK-23935 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-24380) argument quoting/escaping broken in mesos cluster scheduler
[ https://issues.apache.org/jira/browse/SPARK-24380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] paul mackles closed SPARK-24380. > argument quoting/escaping broken in mesos cluster scheduler > --- > > Key: SPARK-24380 > URL: https://issues.apache.org/jira/browse/SPARK-24380 > Project: Spark > Issue Type: Bug > Components: Deploy, Mesos >Affects Versions: 2.2.0, 2.3.0 >Reporter: paul mackles >Priority: Critical > Fix For: 2.4.0 > > > When a configuration property contains shell characters that require quoting, > the Mesos cluster scheduler generates the spark-submit argument like so: > {code:java} > --conf "spark.mesos.executor.docker.parameters="label=logging=|foo|""{code} > Note the quotes around the property value as well as the key=value pair. When > using docker, this breaks the spark-submit command and causes the "|" to be > interpreted as an actual shell PIPE. Spaces, semi-colons, etc also cause > issues. > Although I haven't tried, I suspect this is also a potential security issue > in that someone could exploit it to run arbitrary code on the host. > My patch is pretty minimal and just removes the outer quotes around the > key=value pair, resulting in something like: > {code:java} > --conf spark.mesos.executor.docker.parameters="label=logging=|foo|"{code} > A more extensive fix might try wrapping the entire key=value pair in single > quotes but I was concerned about backwards compatibility with that change. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24380) argument quoting/escaping broken in mesos cluster scheduler
[ https://issues.apache.org/jira/browse/SPARK-24380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] paul mackles resolved SPARK-24380. -- Resolution: Duplicate Dupe of SPARK-23941, just a different config > argument quoting/escaping broken in mesos cluster scheduler > --- > > Key: SPARK-24380 > URL: https://issues.apache.org/jira/browse/SPARK-24380 > Project: Spark > Issue Type: Bug > Components: Deploy, Mesos >Affects Versions: 2.2.0, 2.3.0 >Reporter: paul mackles >Priority: Critical > Fix For: 2.4.0 > > > When a configuration property contains shell characters that require quoting, > the Mesos cluster scheduler generates the spark-submit argument like so: > {code:java} > --conf "spark.mesos.executor.docker.parameters="label=logging=|foo|""{code} > Note the quotes around the property value as well as the key=value pair. When > using docker, this breaks the spark-submit command and causes the "|" to be > interpreted as an actual shell PIPE. Spaces, semi-colons, etc also cause > issues. > Although I haven't tried, I suspect this is also a potential security issue > in that someone could exploit it to run arbitrary code on the host. > My patch is pretty minimal and just removes the outer quotes around the > key=value pair, resulting in something like: > {code:java} > --conf spark.mesos.executor.docker.parameters="label=logging=|foo|"{code} > A more extensive fix might try wrapping the entire key=value pair in single > quotes but I was concerned about backwards compatibility with that change. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24331) Add arrays_overlap / array_repeat / map_entries
[ https://issues.apache.org/jira/browse/SPARK-24331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marek Novotny updated SPARK-24331: -- Summary: Add arrays_overlap / array_repeat / map_entries(was: Add cardinality / arrays_overlap / array_repeat / map_entries ) > Add arrays_overlap / array_repeat / map_entries > - > > Key: SPARK-24331 > URL: https://issues.apache.org/jira/browse/SPARK-24331 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.4.0 >Reporter: Marek Novotny >Priority: Major > > Add SparkR equivalent to: > * cardinality - SPARK-23923 > * arrays_overlap - SPARK-23922 > * array_repeat - SPARK-23925 > * map_entries - SPARK-23935 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24390) confusion of columns in projection after WITH ROLLUP
Ryan Foss created SPARK-24390: - Summary: confusion of columns in projection after WITH ROLLUP Key: SPARK-24390 URL: https://issues.apache.org/jira/browse/SPARK-24390 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Environment: Databricks runtime 4.0 Reporter: Ryan Foss Using two CTEs, where the first issues a WITH ROLLUP and the second is a projection of the first, when attempting to join the two CTEs, spark seems to consider the key column in each CTE to be the same column, resulting in a "Cannot resolve column" error. {noformat} CREATE TABLE IF NOT EXISTS test_rollup (key varchar(3), code varchar(3), stuff int); EXPLAIN WITH cte1 AS ( SELECT key, code, struct(code, avg(stuff)) AS stuff FROM test_rollup GROUP BY key, code WITH ROLLUP ), cte2 AS ( SELECT key, collect_list(stuff) AS stuff_details FROM cte1 WHERE code IS NOT NULL GROUP BY key ) -- join summary record from cte1 to cte2 SELECT c1.key, c1.stuff AS summary_stuff, c2.stuff_details AS detail_stuff FROM cte1 c1 JOIN cte2 c2 ON c2.key = c1.key WHERE c1.code IS NULL == Physical Plan == org.apache.spark.sql.AnalysisException: cannot resolve '`c2.key`' given input columns: [c1.key, c1.code, c1.stuff, c2.stuff_details]; line 22 pos 5; 'Project ['c1.key, 'c1.stuff AS summary_stuff#5415, 'c2.stuff_details AS detail_stuff#5416] +- 'Filter isnull('c1.code) +- 'Join Inner, ('c2.key = 'c1.key) :- SubqueryAlias c1 : +- SubqueryAlias cte1 : +- Aggregate [key#5429, code#5430, spark_grouping_id#5426], [key#5429, code#5430, named_struct(code, code#5430, col2, avg(cast(stuff#5424 as bigint))) AS stuff#5417] : +- Expand [List(key#5422, code#5423, stuff#5424, key#5427, code#5428, 0), List(key#5422, code#5423, stuff#5424, key#5427, null, 1), List(key#5422, code#5423, stuff#5424, null, null, 3)], [key#5422, code#5423, stuff#5424, key#5429, code#5430, spark_grouping_id#5426] : +- Project [key#5422, code#5423, stuff#5424, key#5422 AS key#5427, code#5423 AS code#5428]{noformat} Changing the cte2 query and adding a column alias "key AS key", will cause the columns to be considered unique, resolving the join issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24389) describe() can't work on column that name contain dots
[ https://issues.apache.org/jira/browse/SPARK-24389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490797#comment-16490797 ] Marco Gaido commented on SPARK-24389: - I cannot reproduce on current master. Probably it has been fixed in SPARK-21100 (ie. 2.3.0). I think we can close this. > describe() can't work on column that name contain dots > -- > > Key: SPARK-24389 > URL: https://issues.apache.org/jira/browse/SPARK-24389 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: zhanggengxin >Priority: Major > > {code:scala} > val df = Seq((1, 1)).toDF("a_b", "a.c") > df.describe() // won't work > df.describe("`a.c`") // will work > {code} > Given that you can't use describe() on dataFrame that column contain dots -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24373: Assignee: Apache Spark > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Assignee: Apache Spark >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24373: Assignee: (was: Apache Spark) > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490772#comment-16490772 ] Apache Spark commented on SPARK-24373: -- User 'mgaido91' has created a pull request for this issue: https://github.com/apache/spark/pull/21432 > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19112) add codec for ZStandard
[ https://issues.apache.org/jira/browse/SPARK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490749#comment-16490749 ] Apache Spark commented on SPARK-19112: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/21431 > add codec for ZStandard > --- > > Key: SPARK-19112 > URL: https://issues.apache.org/jira/browse/SPARK-19112 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Thomas Graves >Assignee: Sital Kedia >Priority: Minor > Fix For: 2.3.0 > > > ZStandard: https://github.com/facebook/zstd and > http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was > recently released. Hadoop > (https://issues.apache.org/jira/browse/HADOOP-13578) and others > (https://issues.apache.org/jira/browse/KAFKA-4514) are adopting it. > Zstd seems to give great results => Gzip level Compression with Lz4 level CPU. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23991) data loss when allocateBlocksToBatch
[ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23991: Assignee: (was: Apache Spark) > data loss when allocateBlocksToBatch > > > Key: SPARK-23991 > URL: https://issues.apache.org/jira/browse/SPARK-23991 > Project: Spark > Issue Type: Bug > Components: DStreams, Input/Output >Affects Versions: 2.2.0 > Environment: spark 2.11 >Reporter: kevin fu >Priority: Major > > with checkpoint and WAL enabled, driver will write the allocation of blocks > to batch into hdfs. however, if it fails as following, the blocks of this > batch cannot be computed by the DAG. Because the blocks have been dequeued > from the receivedBlockQueue and get lost. > {panel:title=error log} > 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing > record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> > ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: > Exception thrown in awaitResult: at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at > org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused > by: java.util.concurrent.TimeoutException: Futures timed out after [5000 > milliseconds] at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 > more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch > 152376548 ms needs to be processed again in WAL recovery{panel} > the concerning codes are showed below: > {code} > /** >* Allocate all unallocated blocks to the given batch. >* This event will get written to the write ahead log (if enabled). >*/ > def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { > if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) > { > val streamIdToBlocks = streamIds.map { streamId => > (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) > }.toMap > val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) > if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { > timeToAllocatedBlocks.put(batchTime, allocatedBlocks) > lastAllocatedBatchTime = batchTime > } else { > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } else { > // This situation occurs when: > // 1. WAL is ended with BatchAllocationEvent, but without > BatchCleanupEvent, > // possibly processed batch job or half-processed batch job need to be > processed again, > // so the batchTime will be equal to lastAllocatedBatchTime. > // 2. Slow checkpointing makes recovered batch time older than WAL > recovered > // lastAllocatedBatchTime. > // This situation will only occurs in recovery time. > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23991) data loss when allocateBlocksToBatch
[ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490729#comment-16490729 ] Apache Spark commented on SPARK-23991: -- User 'gaborgsomogyi' has created a pull request for this issue: https://github.com/apache/spark/pull/21430 > data loss when allocateBlocksToBatch > > > Key: SPARK-23991 > URL: https://issues.apache.org/jira/browse/SPARK-23991 > Project: Spark > Issue Type: Bug > Components: DStreams, Input/Output >Affects Versions: 2.2.0 > Environment: spark 2.11 >Reporter: kevin fu >Priority: Major > > with checkpoint and WAL enabled, driver will write the allocation of blocks > to batch into hdfs. however, if it fails as following, the blocks of this > batch cannot be computed by the DAG. Because the blocks have been dequeued > from the receivedBlockQueue and get lost. > {panel:title=error log} > 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing > record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> > ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: > Exception thrown in awaitResult: at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at > org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused > by: java.util.concurrent.TimeoutException: Futures timed out after [5000 > milliseconds] at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 > more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch > 152376548 ms needs to be processed again in WAL recovery{panel} > the concerning codes are showed below: > {code} > /** >* Allocate all unallocated blocks to the given batch. >* This event will get written to the write ahead log (if enabled). >*/ > def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { > if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) > { > val streamIdToBlocks = streamIds.map { streamId => > (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) > }.toMap > val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) > if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { > timeToAllocatedBlocks.put(batchTime, allocatedBlocks) > lastAllocatedBatchTime = batchTime > } else { > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } else { > // This situation occurs when: > // 1. WAL is ended with BatchAllocationEvent, but without > BatchCleanupEvent, > // possibly processed batch job or half-processed batch job need to be > processed again, > // so the batchTime will be equal to lastAllocatedBatchTime. > // 2. Slow checkpointing makes recovered batch time older than WAL > recovered > // lastAllocatedBatchTime. > // This situation will only occurs in recovery time. > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spar
[jira] [Assigned] (SPARK-23991) data loss when allocateBlocksToBatch
[ https://issues.apache.org/jira/browse/SPARK-23991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-23991: Assignee: Apache Spark > data loss when allocateBlocksToBatch > > > Key: SPARK-23991 > URL: https://issues.apache.org/jira/browse/SPARK-23991 > Project: Spark > Issue Type: Bug > Components: DStreams, Input/Output >Affects Versions: 2.2.0 > Environment: spark 2.11 >Reporter: kevin fu >Assignee: Apache Spark >Priority: Major > > with checkpoint and WAL enabled, driver will write the allocation of blocks > to batch into hdfs. however, if it fails as following, the blocks of this > batch cannot be computed by the DAG. Because the blocks have been dequeued > from the receivedBlockQueue and get lost. > {panel:title=error log} > 18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing > record: BatchAllocationEvent(152376548 ms,AllocatedBlocks(Map(0 -> > ArrayBuffer( to the WriteAheadLog. org.apache.spark.SparkException: > Exception thrown in awaitResult: at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at > org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused > by: java.util.concurrent.TimeoutException: Futures timed out after [5000 > milliseconds] at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:190) at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 > more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch > 152376548 ms needs to be processed again in WAL recovery{panel} > the concerning codes are showed below: > {code} > /** >* Allocate all unallocated blocks to the given batch. >* This event will get written to the write ahead log (if enabled). >*/ > def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { > if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) > { > val streamIdToBlocks = streamIds.map { streamId => > (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) > }.toMap > val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) > if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { > timeToAllocatedBlocks.put(batchTime, allocatedBlocks) > lastAllocatedBatchTime = batchTime > } else { > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } else { > // This situation occurs when: > // 1. WAL is ended with BatchAllocationEvent, but without > BatchCleanupEvent, > // possibly processed batch job or half-processed batch job need to be > processed again, > // so the batchTime will be equal to lastAllocatedBatchTime. > // 2. Slow checkpointing makes recovered batch time older than WAL > recovered > // lastAllocatedBatchTime. > // This situation will only occurs in recovery time. > logInfo(s"Possibly processed batch $batchTime needs to be processed > again in WAL recovery") > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24373) "df.cache() df.count()" no longer eagerly caches data
[ https://issues.apache.org/jira/browse/SPARK-24373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490551#comment-16490551 ] Marco Gaido commented on SPARK-24373: - [~wbzhao] yes, I do agree with you. That is the problem. > "df.cache() df.count()" no longer eagerly caches data > - > > Key: SPARK-24373 > URL: https://issues.apache.org/jira/browse/SPARK-24373 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Wenbo Zhao >Priority: Major > > Here is the code to reproduce in local mode > {code:java} > scala> val df = sc.range(1, 2).toDF > df: org.apache.spark.sql.DataFrame = [value: bigint] > scala> val myudf = udf({x: Long => println(""); x + 1}) > myudf: org.apache.spark.sql.expressions.UserDefinedFunction = > UserDefinedFunction(,LongType,Some(List(LongType))) > scala> val df1 = df.withColumn("value1", myudf(col("value"))) > df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] > scala> df1.cache > res0: df1.type = [value: bigint, value1: bigint] > scala> df1.count > res1: Long = 1 > scala> df1.count > res2: Long = 1 > scala> df1.count > res3: Long = 1 > {code} > > in Spark 2.2, you could see it prints "". > In the above example, when you do explain. You could see > {code:java} > scala> df1.explain(true) > == Parsed Logical Plan == > 'Project [value#2L, UDF('value) AS value1#5] > +- AnalysisBarrier > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > value: bigint, value1: bigint > Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > == Physical Plan == > *(1) InMemoryTableScan [value#2L, value1#5L] > +- InMemoryRelation [value#2L, value1#5L], true, 1, StorageLevel(disk, > memory, deserialized, 1 replicas) > +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > but the ImMemoryTableScan is mising in the following explain() > {code:java} > scala> df1.groupBy().count().explain(true) > == Parsed Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS > value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#170L] > +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) > null else UDF(value#2L) AS value1#5L] > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Optimized Logical Plan == > Aggregate [count(1) AS count#170L] > +- Project > +- SerializeFromObject [input[0, bigint, false] AS value#2L] > +- ExternalRDD [obj#1L] > == Physical Plan == > *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) > +- Exchange SinglePartition > +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#175L]) > +- *(1) Project > +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] > +- Scan ExternalRDDScan[obj#1L] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24389) describe() can't work on column that name contain dots
zhanggengxin created SPARK-24389: Summary: describe() can't work on column that name contain dots Key: SPARK-24389 URL: https://issues.apache.org/jira/browse/SPARK-24389 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: zhanggengxin {code:scala} val df = Seq((1, 1)).toDF("a_b", "a.c") df.describe() // won't work df.describe("`a.c`") // will work {code} Given that you can't use describe() on dataFrame that column contain dots -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24271) sc.hadoopConfigurations can not be overwritten in the same spark context
[ https://issues.apache.org/jira/browse/SPARK-24271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490499#comment-16490499 ] Jami Malikzade commented on SPARK-24271: [~ste...@apache.org] Thank you > sc.hadoopConfigurations can not be overwritten in the same spark context > > > Key: SPARK-24271 > URL: https://issues.apache.org/jira/browse/SPARK-24271 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 2.3.0 >Reporter: Jami Malikzade >Priority: Major > > If for example we pass to spark context following configs : > sc.hadoopConfiguration.set("fs.s3a.access.key", "correctAK") > sc.hadoopConfiguration.set("fs.s3a.secret.key", "correctSK") > sc.hadoopConfiguration.set("fs.s3a.endpoint", "objectstorage:8773") // > sc.hadoopConfiguration.set("fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > sc.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false") > We are able later read from bucket. So behavior is expected. > If in the same sc I will change credentials to wrong, and will try to read > from bucket it will still work, > and vice versa if it were wrong credentials,changing to working will not work. > sc.hadoopConfiguration.set("fs.s3a.access.key", "wrongAK") // > sc.hadoopConfiguration.set("fs.s3a.secret.key", "wrongSK") // -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24271) sc.hadoopConfigurations can not be overwritten in the same spark context
[ https://issues.apache.org/jira/browse/SPARK-24271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490474#comment-16490474 ] Steve Loughran commented on SPARK-24271: Disabling the s3 cache can be pretty inefficient, as every worker talking to a bucket is going to create a new instance, with its own AWS thread pool & things. if you are trying to change perms for different buckets, you can use [per-bucket configuration instead|https://hadoop.apache.org/docs/r3.1.0/hadoop-aws/tools/hadoop-aws/index.html#Configuring_different_S3_buckets_with_Per-Bucket_Configuration] {code} fs.s3a.bucket.myfirstbucket.access.key=A fs.s3a.bucket.myfirstbucket.secret.key= fs.s3a.bucket.backups.access.key=B fs.s3a.bucket.backups.secret.key=Y {code} Same for things like endpoint. these can all coexist in the same configuration file, where I'd recommend a spark-default.conf rather than code, as with code it's all to easy to accidentally commit your secrets somewhere public like github. > sc.hadoopConfigurations can not be overwritten in the same spark context > > > Key: SPARK-24271 > URL: https://issues.apache.org/jira/browse/SPARK-24271 > Project: Spark > Issue Type: Bug > Components: Spark Shell >Affects Versions: 2.3.0 >Reporter: Jami Malikzade >Priority: Major > > If for example we pass to spark context following configs : > sc.hadoopConfiguration.set("fs.s3a.access.key", "correctAK") > sc.hadoopConfiguration.set("fs.s3a.secret.key", "correctSK") > sc.hadoopConfiguration.set("fs.s3a.endpoint", "objectstorage:8773") // > sc.hadoopConfiguration.set("fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > sc.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false") > We are able later read from bucket. So behavior is expected. > If in the same sc I will change credentials to wrong, and will try to read > from bucket it will still work, > and vice versa if it were wrong credentials,changing to working will not work. > sc.hadoopConfiguration.set("fs.s3a.access.key", "wrongAK") // > sc.hadoopConfiguration.set("fs.s3a.secret.key", "wrongSK") // -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-17592) SQL: CAST string as INT inconsistent with Hive
[ https://issues.apache.org/jira/browse/SPARK-17592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Machado updated SPARK-17592: -- Comment: was deleted (was: I'm hitting the same issue I'm afraid but in slightly another way. When I have a dataframe (that comes from oracle DB ) as parquet I can see in the logs that a field is beeing saved as integer : { "type" : "struct", "fields" : [ \{ "name" : "project_id", "type" : "integer", "nullable" : true, "metadata" : { } },... on hue (which reads from hive) I see : !image-2018-05-24-17-10-24-515.png!) > SQL: CAST string as INT inconsistent with Hive > -- > > Key: SPARK-17592 > URL: https://issues.apache.org/jira/browse/SPARK-17592 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Furcy Pin >Priority: Major > Attachments: image-2018-05-24-17-10-24-515.png > > > Hello, > there seem to be an inconsistency between Spark and Hive when casting a > string into an Int. > With Hive: > {code} > select cast("0.4" as INT) ; > > 0 > select cast("0.5" as INT) ; > > 0 > select cast("0.6" as INT) ; > > 0 > {code} > With Spark-SQL: > {code} > select cast("0.4" as INT) ; > > 0 > select cast("0.5" as INT) ; > > 1 > select cast("0.6" as INT) ; > > 1 > {code} > Hive seems to perform a floor(string.toDouble), while Spark seems to perform > a round(string.toDouble) > I'm not sure there is any ISO standard for this, mysql has the same behavior > than Hive, while postgresql performs a string.toInt and throws an > NumberFormatException > Personnally I think Hive is right, hence my posting this here. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24388) EventLoop's run method don't handle fatal error, causes driver hang forever
[ https://issues.apache.org/jira/browse/SPARK-24388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490373#comment-16490373 ] Xianjin YE commented on SPARK-24388: I am working on this and will send a pr soon. > EventLoop's run method don't handle fatal error, causes driver hang forever > --- > > Key: SPARK-24388 > URL: https://issues.apache.org/jira/browse/SPARK-24388 > Project: Spark > Issue Type: Bug > Components: Scheduler, Spark Core >Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0 >Reporter: Xianjin YE >Priority: Major > > Once a fatal error(such as NoSuchMethodError) happens during > `onReceive(event)`, the eventThread thread will exist. However the eventQueue > is still accepting events. The whole spark application will hang forever. > > cc [~zsxwing] [~XuanYuan] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24388) EventLoop's run method don't handle fatal error, causes driver hang forever
Xianjin YE created SPARK-24388: -- Summary: EventLoop's run method don't handle fatal error, causes driver hang forever Key: SPARK-24388 URL: https://issues.apache.org/jira/browse/SPARK-24388 Project: Spark Issue Type: Bug Components: Scheduler, Spark Core Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.1.2, 2.1.1, 2.1.0 Reporter: Xianjin YE Once a fatal error(such as NoSuchMethodError) happens during `onReceive(event)`, the eventThread thread will exist. However the eventQueue is still accepting events. The whole spark application will hang forever. cc [~zsxwing] [~XuanYuan] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org