[jira] [Assigned] (SPARK-20368) Support Sentry on PySpark workers
[ https://issues.apache.org/jira/browse/SPARK-20368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20368: Assignee: Apache Spark > Support Sentry on PySpark workers > - > > Key: SPARK-20368 > URL: https://issues.apache.org/jira/browse/SPARK-20368 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Alexander Shorin >Assignee: Apache Spark > > [Setry|https://sentry.io] is a well known among Python developers system to > capture, classify, track and explain tracebacks, helping people better > understand what went wrong, how to reproduce the issue and fix it. > Any Spark application on Python is actually divided on two parts: > 1. The one that runs on "driver side". That part user may control in all the > ways it want and provide reports to Sentry is very easy to do here. > 2. The one that runs on executors. That's Python UDFs and the rest > transformation functions. Unfortunately, here we cannot provide such kind of > feature. And that is the part this feature is about. > In order to simplify developing experience, it would be nice to have optional > Sentry support on PySpark worker level. > What this feature could looks like? > 1. PySpark will have new extra named {{sentry}} which installs Sentry client > and the rest required things if are necessary. This is an optional > install-time dependency. > 2. PySpark worker will be able to detect presence of Sentry support and send > error reports there. > 3. All configuration of Sentry could and will be done via standard Sentry`s > environment variables. > What this feature will give to users? > 1. Better exceptions in Sentry. From driver-side application, now all of them > get recorded as like `Py4JJavaError` where the real executor exception is > written in a traceback body. > 2. Greater simplification of understanding context when thing went wrong and > why. > 3. Simplify Python UDFs debug and issues reproduce. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20369) pyspark: Dynamic configuration with SparkConf does not work
Matthew McClain created SPARK-20369: --- Summary: pyspark: Dynamic configuration with SparkConf does not work Key: SPARK-20369 URL: https://issues.apache.org/jira/browse/SPARK-20369 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.1.0 Environment: Ubuntu 14.04.1 LTS (GNU/Linux 3.13.0-40-generic x86_64) and Mac OS X 10.11.6 Reporter: Matthew McClain Priority: Minor Setting spark properties dynamically in pyspark using SparkConf object does not work. Here is the code that shows the bug: --- from pyspark import SparkContext, SparkConf def main(): conf = SparkConf().setAppName("spark-conf-test") \ .setMaster("local[2]") \ .set('spark.python.worker.memory',"1g") \ .set('spark.executor.memory',"3g") \ .set("spark.driver.maxResultSize","2g") print "Spark Config values in SparkConf:" print conf.toDebugString() sc = SparkContext(conf=conf) print "Actual Spark Config values:" print sc.getConf().toDebugString() if __name__ == "__main__": main() --- Here is the output; none of the config values set in SparkConf are used in the SparkContext configuration: Spark Config values in SparkConf: spark.master=local[2] spark.executor.memory=3g spark.python.worker.memory=1g spark.app.name=spark-conf-test spark.driver.maxResultSize=2g 17/04/18 10:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Actual Spark Config values: spark.app.id=local-1492528885708 spark.app.name=sandbox.py spark.driver.host=10.201.26.172 spark.driver.maxResultSize=4g spark.driver.port=54657 spark.executor.id=driver spark.files=file:/Users/matt.mcclain/dev/datascience-experiments/mmcclain/client_clusters/sandbox.py spark.master=local[*] spark.rdd.compress=True spark.serializer.objectStreamReset=100 spark.submit.deployMode=client -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20368) Support Sentry on PySpark workers
Alexander Shorin created SPARK-20368: Summary: Support Sentry on PySpark workers Key: SPARK-20368 URL: https://issues.apache.org/jira/browse/SPARK-20368 Project: Spark Issue Type: New Feature Components: PySpark Affects Versions: 2.1.0 Reporter: Alexander Shorin [Setry|https://sentry.io] is a well known among Python developers system to capture, classify, track and explain tracebacks, helping people better understand what went wrong, how to reproduce the issue and fix it. Any Spark application on Python is actually divided on two parts: 1. The one that runs on "driver side". That part user may control in all the ways it want and provide reports to Sentry is very easy to do here. 2. The one that runs on executors. That's Python UDFs and the rest transformation functions. Unfortunately, here we cannot provide such kind of feature. And that is the part this feature is about. In order to simplify developing experience, it would be nice to have optional Sentry support on PySpark worker level. What this feature could looks like? 1. PySpark will have new extra named {{sentry}} which installs Sentry client and the rest required things if are necessary. This is an optional install-time dependency. 2. PySpark worker will be able to detect presence of Sentry support and send error reports there. 3. All configuration of Sentry could and will be done via standard Sentry`s environment variables. What this feature will give to users? 1. Better exceptions in Sentry. From driver-side application, now all of them get recorded as like `Py4JJavaError` where the real executor exception is written in a traceback body. 2. Greater simplification of understanding context when thing went wrong and why. 3. Simplify Python UDFs debug and issues reproduce. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20367) Spark silently escapes partition column names
[ https://issues.apache.org/jira/browse/SPARK-20367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972871#comment-15972871 ] Hyukjin Kwon commented on SPARK-20367: -- Doh. I rushed reading ... > Spark silently escapes partition column names > - > > Key: SPARK-20367 > URL: https://issues.apache.org/jira/browse/SPARK-20367 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Juliusz Sompolski >Priority: Minor > > CSV files can have arbitrary column names: > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.option("header", true).csv("/tmp/foo") > scala> spark.read.option("header", true).csv("/tmp/foo").schema > res1: org.apache.spark.sql.types.StructType = > StructType(StructField(Column?,StringType,true), > StructField(id,StringType,true)) > {code} > However, once a column with characters like "?" in the name gets used in a > partitioning column, the column name gets silently escaped, and reading the > schema information back renders the column name with "?" turned into "%3F": > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.partitionBy("Column?").option("header", true).csv("/tmp/bar") > scala> spark.read.option("header", true).csv("/tmp/bar").schema > res3: org.apache.spark.sql.types.StructType = > StructType(StructField(id,StringType,true), > StructField(Column%3F,IntegerType,true)) > {code} > The same happens for other formats, but I encountered it working with CSV, > since these more often contain ugly schemas... > Not sure if it's a bug or a feature, but it might be more intuitive to fail > queries with invalid characters in the partitioning column name, rather than > silently escaping the name? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20367) Spark silently escapes partition column names
[ https://issues.apache.org/jira/browse/SPARK-20367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972867#comment-15972867 ] Juliusz Sompolski commented on SPARK-20367: --- Hi [~hyukjin.kwon]. I tested also with parquet, and it also happens there. {quote} The same happens for other formats, but I encountered it working with CSV, since these more often contain ugly schemas... {quote} > Spark silently escapes partition column names > - > > Key: SPARK-20367 > URL: https://issues.apache.org/jira/browse/SPARK-20367 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Juliusz Sompolski >Priority: Minor > > CSV files can have arbitrary column names: > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.option("header", true).csv("/tmp/foo") > scala> spark.read.option("header", true).csv("/tmp/foo").schema > res1: org.apache.spark.sql.types.StructType = > StructType(StructField(Column?,StringType,true), > StructField(id,StringType,true)) > {code} > However, once a column with characters like "?" in the name gets used in a > partitioning column, the column name gets silently escaped, and reading the > schema information back renders the column name with "?" turned into "%3F": > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.partitionBy("Column?").option("header", true).csv("/tmp/bar") > scala> spark.read.option("header", true).csv("/tmp/bar").schema > res3: org.apache.spark.sql.types.StructType = > StructType(StructField(id,StringType,true), > StructField(Column%3F,IntegerType,true)) > {code} > The same happens for other formats, but I encountered it working with CSV, > since these more often contain ugly schemas... > Not sure if it's a bug or a feature, but it might be more intuitive to fail > queries with invalid characters in the partitioning column name, rather than > silently escaping the name? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20367) Spark silently escapes partition column names
[ https://issues.apache.org/jira/browse/SPARK-20367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972865#comment-15972865 ] Hyukjin Kwon commented on SPARK-20367: -- Actually, I did while trying to reproduce this :) {code} scala> spark.range(1).select(col("id").as("Column?"), col("id")).write.partitionBy("Column?").parquet("/tmp/bar00") scala> spark.read.parquet("/tmp/bar00").schema res16: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,true), StructField(Column%3F,IntegerType,true)) {code} {code} scala> spark.range(1).select(col("id").as("Column?"), col("id")).write.partitionBy("Column?").json("/tmp/bar0") scala> spark.read.json("/tmp/bar0").schema res13: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,true), StructField(Column%3F,IntegerType,true)) {code} > Spark silently escapes partition column names > - > > Key: SPARK-20367 > URL: https://issues.apache.org/jira/browse/SPARK-20367 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Juliusz Sompolski >Priority: Minor > > CSV files can have arbitrary column names: > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.option("header", true).csv("/tmp/foo") > scala> spark.read.option("header", true).csv("/tmp/foo").schema > res1: org.apache.spark.sql.types.StructType = > StructType(StructField(Column?,StringType,true), > StructField(id,StringType,true)) > {code} > However, once a column with characters like "?" in the name gets used in a > partitioning column, the column name gets silently escaped, and reading the > schema information back renders the column name with "?" turned into "%3F": > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.partitionBy("Column?").option("header", true).csv("/tmp/bar") > scala> spark.read.option("header", true).csv("/tmp/bar").schema > res3: org.apache.spark.sql.types.StructType = > StructType(StructField(id,StringType,true), > StructField(Column%3F,IntegerType,true)) > {code} > The same happens for other formats, but I encountered it working with CSV, > since these more often contain ugly schemas... > Not sure if it's a bug or a feature, but it might be more intuitive to fail > queries with invalid characters in the partitioning column name, rather than > silently escaping the name? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20367) Spark silently escapes partition column names
[ https://issues.apache.org/jira/browse/SPARK-20367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972850#comment-15972850 ] Hyukjin Kwon commented on SPARK-20367: -- I guess probably this is not a CSV datasource specific behaviour. Would you be able to test other sources such as text or json? > Spark silently escapes partition column names > - > > Key: SPARK-20367 > URL: https://issues.apache.org/jira/browse/SPARK-20367 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 >Reporter: Juliusz Sompolski >Priority: Minor > > CSV files can have arbitrary column names: > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.option("header", true).csv("/tmp/foo") > scala> spark.read.option("header", true).csv("/tmp/foo").schema > res1: org.apache.spark.sql.types.StructType = > StructType(StructField(Column?,StringType,true), > StructField(id,StringType,true)) > {code} > However, once a column with characters like "?" in the name gets used in a > partitioning column, the column name gets silently escaped, and reading the > schema information back renders the column name with "?" turned into "%3F": > {code} > scala> spark.range(1).select(col("id").as("Column?"), > col("id")).write.partitionBy("Column?").option("header", true).csv("/tmp/bar") > scala> spark.read.option("header", true).csv("/tmp/bar").schema > res3: org.apache.spark.sql.types.StructType = > StructType(StructField(id,StringType,true), > StructField(Column%3F,IntegerType,true)) > {code} > The same happens for other formats, but I encountered it working with CSV, > since these more often contain ugly schemas... > Not sure if it's a bug or a feature, but it might be more intuitive to fail > queries with invalid characters in the partitioning column name, rather than > silently escaping the name? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20343) SBT master build for Hadoop 2.6 in Jenkins fails due to Avro version resolution
[ https://issues.apache.org/jira/browse/SPARK-20343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972842#comment-15972842 ] Hyukjin Kwon commented on SPARK-20343: -- Please let me know if anyone is able to reproduce this. I am having hard time to reproduce this.. > SBT master build for Hadoop 2.6 in Jenkins fails due to Avro version > resolution > > > Key: SPARK-20343 > URL: https://issues.apache.org/jira/browse/SPARK-20343 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.2.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon > Fix For: 2.2.0 > > > Please refer https://github.com/apache/spark/pull/17477#issuecomment-293942637 > {quote} > [error] > /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala:123: > value createDatumWriter is not a member of > org.apache.avro.generic.GenericData > [error] writerCache.getOrElseUpdate(schema, > GenericData.get.createDatumWriter(schema)) > [error] > {quote} > https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/2770/consoleFull > It seems sbt has a different resolution for Avro differently with Maven in > some cases. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6509) MDLP discretizer
[ https://issues.apache.org/jira/browse/SPARK-6509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972806#comment-15972806 ] Sergio Ramírez commented on SPARK-6509: --- Thanks again Barry for your support. I hope this proof can serve to promote DMDLP to the main API in MLlib. > MDLP discretizer > > > Key: SPARK-6509 > URL: https://issues.apache.org/jira/browse/SPARK-6509 > Project: Spark > Issue Type: New Feature > Components: MLlib >Reporter: Sergio Ramírez > > Minimum Description Lenght Discretizer > This method implements Fayyad's discretizer [1] based on Minimum Description > Length Principle (MDLP) in order to treat non discrete datasets from a > distributed perspective. We have developed a distributed version from the > original one performing some important changes. > Associated paper: > Ramírez-Gallego, S., García, S., Mouriño-Talín, H., Martínez-Rego, D., > Bolón-Canedo, V., Alonso-Betanzos, A., Benítez, J. M. and Herrera, F. (2016), > Data discretization: taxonomy and big data challenge. WIREs Data Mining > Knowledge Discovery, 6: 5–21. doi:10.1002/widm.1173 > URL: http://onlinelibrary.wiley.com/doi/10.1002/widm.1173/abstract > -- Improvements on discretizer: > Support for sparse data. > Multi-attribute processing. The whole process is carried out in a single > step when the number of boundary points per attribute fits well in one > partition (<= 100K boundary points per attribute). > Support for attributes with a huge number of boundary points (> 100K > boundary points per attribute). Rare situation. > This software has been proved with two large real-world datasets such as: > A dataset selected for the GECCO-2014 in Vancouver, July 13th, 2014 > competition, which comes from the Protein Structure Prediction field > (http://cruncher.ncl.ac.uk/bdcomp/). The dataset has 32 million instances, > 631 attributes, 2 classes, 98% of negative examples and occupies, when > uncompressed, about 56GB of disk space. > Epsilon dataset: > http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon. > 400K instances and 2K attributes > We have demonstrated that our method performs 300 times faster than the > sequential version for the first dataset, and also improves the accuracy for > Naive Bayes. > Publication: S. Ramírez-Gallego, S. García, H. Mouriño-Talin, D. > Martínez-Rego, V. Bolón, A. Alonso-Betanzos, J.M. Benitez, F. Herrera. "Data > Discretization: Taxonomy and Big Data Challenge", WIRES Data Mining and > Knowledge Discovery. In press, 2015. > Design doc: > https://docs.google.com/document/d/1HOaPL_HJzTbL2tVdzbTjhr5wxVvPe9e-23S7rc2VcsY/edit?usp=sharing > References > [1] Fayyad, U., & Irani, K. (1993). > "Multi-interval discretization of continuous-valued attributes for > classification learning." -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20367) Spark silently escapes partition column names
Juliusz Sompolski created SPARK-20367: - Summary: Spark silently escapes partition column names Key: SPARK-20367 URL: https://issues.apache.org/jira/browse/SPARK-20367 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0, 2.2.0 Reporter: Juliusz Sompolski Priority: Minor CSV files can have arbitrary column names: {code} scala> spark.range(1).select(col("id").as("Column?"), col("id")).write.option("header", true).csv("/tmp/foo") scala> spark.read.option("header", true).csv("/tmp/foo").schema res1: org.apache.spark.sql.types.StructType = StructType(StructField(Column?,StringType,true), StructField(id,StringType,true)) {code} However, once a column with characters like "?" in the name gets used in a partitioning column, the column name gets silently escaped, and reading the schema information back renders the column name with "?" turned into "%3F": {code} scala> spark.range(1).select(col("id").as("Column?"), col("id")).write.partitionBy("Column?").option("header", true).csv("/tmp/bar") scala> spark.read.option("header", true).csv("/tmp/bar").schema res3: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(Column%3F,IntegerType,true)) {code} The same happens for other formats, but I encountered it working with CSV, since these more often contain ugly schemas... Not sure if it's a bug or a feature, but it might be more intuitive to fail queries with invalid characters in the partitioning column name, rather than silently escaping the name? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20281) Table-valued function range in SQL should use the same number of partitions as spark.range
[ https://issues.apache.org/jira/browse/SPARK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20281: Assignee: Apache Spark > Table-valued function range in SQL should use the same number of partitions > as spark.range > -- > > Key: SPARK-20281 > URL: https://issues.apache.org/jira/browse/SPARK-20281 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Assignee: Apache Spark >Priority: Minor > > Note the different number of partitions in {{range}} in SQL and as operator. > {code} > scala> spark.range(4).explain > == Physical Plan == > *Range (0, 4, step=1, splits=Some(8)) // <-- note Some(8) > scala> sql("select * from range(4)").explain > == Physical Plan == > *Range (0, 4, step=1, splits=None) // <-- note None > {code} > If I'm not mistaken, the change is to fix {{builtinFunctions}} in > {{ResolveTableValuedFunctions}} (see > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala#L82-L93]) > to use {{sparkContext.defaultParallelism}} as {{SparkSession.range}} (see > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L517]). > Please confirm to work on a fix if and as needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20281) Table-valued function range in SQL should use the same number of partitions as spark.range
[ https://issues.apache.org/jira/browse/SPARK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20281: Assignee: (was: Apache Spark) > Table-valued function range in SQL should use the same number of partitions > as spark.range > -- > > Key: SPARK-20281 > URL: https://issues.apache.org/jira/browse/SPARK-20281 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Note the different number of partitions in {{range}} in SQL and as operator. > {code} > scala> spark.range(4).explain > == Physical Plan == > *Range (0, 4, step=1, splits=Some(8)) // <-- note Some(8) > scala> sql("select * from range(4)").explain > == Physical Plan == > *Range (0, 4, step=1, splits=None) // <-- note None > {code} > If I'm not mistaken, the change is to fix {{builtinFunctions}} in > {{ResolveTableValuedFunctions}} (see > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala#L82-L93]) > to use {{sparkContext.defaultParallelism}} as {{SparkSession.range}} (see > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L517]). > Please confirm to work on a fix if and as needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20281) Table-valued function range in SQL should use the same number of partitions as spark.range
[ https://issues.apache.org/jira/browse/SPARK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972758#comment-15972758 ] Apache Spark commented on SPARK-20281: -- User 'maropu' has created a pull request for this issue: https://github.com/apache/spark/pull/17670 > Table-valued function range in SQL should use the same number of partitions > as spark.range > -- > > Key: SPARK-20281 > URL: https://issues.apache.org/jira/browse/SPARK-20281 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Note the different number of partitions in {{range}} in SQL and as operator. > {code} > scala> spark.range(4).explain > == Physical Plan == > *Range (0, 4, step=1, splits=Some(8)) // <-- note Some(8) > scala> sql("select * from range(4)").explain > == Physical Plan == > *Range (0, 4, step=1, splits=None) // <-- note None > {code} > If I'm not mistaken, the change is to fix {{builtinFunctions}} in > {{ResolveTableValuedFunctions}} (see > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala#L82-L93]) > to use {{sparkContext.defaultParallelism}} as {{SparkSession.range}} (see > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L517]). > Please confirm to work on a fix if and as needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20356) Spark sql group by returns incorrect results after join + distinct transformations
[ https://issues.apache.org/jira/browse/SPARK-20356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972753#comment-15972753 ] Herman van Hovell commented on SPARK-20356: --- Here is a reproduction in scala: {noformat} val df1 = Seq(("a", 1), ("b", 1), ("c", 2)).toDF("item", "group") val df2 = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("item", "id") val df3 = df1.join(df2, Seq("item")).select($"id", $"group".as("item")).distinct() df3.unpersist() val agg_without_cache = df3.groupBy($"item").count() agg_without_cache.show() df3.cache() val agg_with_cache = df3.groupBy($"item").count() agg_with_cache.show() {noformat} > Spark sql group by returns incorrect results after join + distinct > transformations > -- > > Key: SPARK-20356 > URL: https://issues.apache.org/jira/browse/SPARK-20356 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Linux mint 18 > Python 3.5 >Reporter: Chris Kipers > > I'm experiencing a bug with the head version of spark as of 4/17/2017. After > joining to dataframes, renaming a column and invoking distinct, the results > of the aggregation is incorrect after caching the dataframe. The following > code snippet consistently reproduces the error. > from pyspark.sql import SparkSession > import pyspark.sql.functions as sf > import pandas as pd > spark = SparkSession.builder.master("local").appName("Word > Count").getOrCreate() > mapping_sdf = spark.createDataFrame(pd.DataFrame([ > {"ITEM": "a", "GROUP": 1}, > {"ITEM": "b", "GROUP": 1}, > {"ITEM": "c", "GROUP": 2} > ])) > items_sdf = spark.createDataFrame(pd.DataFrame([ > {"ITEM": "a", "ID": 1}, > {"ITEM": "b", "ID": 2}, > {"ITEM": "c", "ID": 3} > ])) > mapped_sdf = \ > items_sdf.join(mapping_sdf, on='ITEM').select("ID", > sf.col("GROUP").alias('ITEM')).distinct() > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct > mapped_sdf.cache() > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 3, incorrect > The next code snippet is almost the same after the first except I don't call > distinct on the dataframe. This snippet performs as expected: > mapped_sdf = \ > items_sdf.join(mapping_sdf, on='ITEM').select("ID", > sf.col("GROUP").alias('ITEM')) > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct > mapped_sdf.cache() > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct > I don't experience this bug with spark 2.1 or event earlier versions for 2.2 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20281) Table-valued function range in SQL should use the same number of partitions as spark.range
[ https://issues.apache.org/jira/browse/SPARK-20281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972756#comment-15972756 ] Takeshi Yamamuro commented on SPARK-20281: -- IIUC they internally use the same value (that is, defaultParallelism) for splits by default. But, I feel this different printing makes users a bit confused. > Table-valued function range in SQL should use the same number of partitions > as spark.range > -- > > Key: SPARK-20281 > URL: https://issues.apache.org/jira/browse/SPARK-20281 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Note the different number of partitions in {{range}} in SQL and as operator. > {code} > scala> spark.range(4).explain > == Physical Plan == > *Range (0, 4, step=1, splits=Some(8)) // <-- note Some(8) > scala> sql("select * from range(4)").explain > == Physical Plan == > *Range (0, 4, step=1, splits=None) // <-- note None > {code} > If I'm not mistaken, the change is to fix {{builtinFunctions}} in > {{ResolveTableValuedFunctions}} (see > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala#L82-L93]) > to use {{sparkContext.defaultParallelism}} as {{SparkSession.range}} (see > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L517]). > Please confirm to work on a fix if and as needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6509) MDLP discretizer
[ https://issues.apache.org/jira/browse/SPARK-6509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972744#comment-15972744 ] Barry Becker commented on SPARK-6509: - As further proof of relevance, I will be giving a [presentation|https://spark-summit.org/2017/events/visualization-of-enhanced-spark-induced-naive-bayes-classifier/] at the Spark Summit 2017 of a version of Naive Bayes (and its visualization) which leverages the MDLP library that Sergio created. The MDLP library is used to do entropy based binning of continuous attributes to improve the accuracy of the model. > MDLP discretizer > > > Key: SPARK-6509 > URL: https://issues.apache.org/jira/browse/SPARK-6509 > Project: Spark > Issue Type: New Feature > Components: MLlib >Reporter: Sergio Ramírez > > Minimum Description Lenght Discretizer > This method implements Fayyad's discretizer [1] based on Minimum Description > Length Principle (MDLP) in order to treat non discrete datasets from a > distributed perspective. We have developed a distributed version from the > original one performing some important changes. > Associated paper: > Ramírez-Gallego, S., García, S., Mouriño-Talín, H., Martínez-Rego, D., > Bolón-Canedo, V., Alonso-Betanzos, A., Benítez, J. M. and Herrera, F. (2016), > Data discretization: taxonomy and big data challenge. WIREs Data Mining > Knowledge Discovery, 6: 5–21. doi:10.1002/widm.1173 > URL: http://onlinelibrary.wiley.com/doi/10.1002/widm.1173/abstract > -- Improvements on discretizer: > Support for sparse data. > Multi-attribute processing. The whole process is carried out in a single > step when the number of boundary points per attribute fits well in one > partition (<= 100K boundary points per attribute). > Support for attributes with a huge number of boundary points (> 100K > boundary points per attribute). Rare situation. > This software has been proved with two large real-world datasets such as: > A dataset selected for the GECCO-2014 in Vancouver, July 13th, 2014 > competition, which comes from the Protein Structure Prediction field > (http://cruncher.ncl.ac.uk/bdcomp/). The dataset has 32 million instances, > 631 attributes, 2 classes, 98% of negative examples and occupies, when > uncompressed, about 56GB of disk space. > Epsilon dataset: > http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon. > 400K instances and 2K attributes > We have demonstrated that our method performs 300 times faster than the > sequential version for the first dataset, and also improves the accuracy for > Naive Bayes. > Publication: S. Ramírez-Gallego, S. García, H. Mouriño-Talin, D. > Martínez-Rego, V. Bolón, A. Alonso-Betanzos, J.M. Benitez, F. Herrera. "Data > Discretization: Taxonomy and Big Data Challenge", WIRES Data Mining and > Knowledge Discovery. In press, 2015. > Design doc: > https://docs.google.com/document/d/1HOaPL_HJzTbL2tVdzbTjhr5wxVvPe9e-23S7rc2VcsY/edit?usp=sharing > References > [1] Fayyad, U., & Irani, K. (1993). > "Multi-interval discretization of continuous-valued attributes for > classification learning." -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-20366: --- Assignee: Zhenhua Wang > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang >Assignee: Zhenhua Wang > Fix For: 2.2.0 > > > If a plan has multi-level successive joins, e.g.: > {noformat} > Join > / \ > Union t5 > / \ > Join t4 > /\ > Join t3 > / \ > t1 t2 > {noformat} > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, but not its child. This breaks the > transform procedure because `mapChildren` applies transform function on > parameters which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-20366. - Resolution: Fixed Fix Version/s: 2.2.0 Issue resolved by pull request 17668 [https://github.com/apache/spark/pull/17668] > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > Fix For: 2.2.0 > > > If a plan has multi-level successive joins, e.g.: > {noformat} > Join > / \ > Union t5 > / \ > Join t4 > /\ > Join t3 > / \ > t1 t2 > {noformat} > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, but not its child. This breaks the > transform procedure because `mapChildren` applies transform function on > parameters which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20356) Spark sql group by returns incorrect results after join + distinct transformations
[ https://issues.apache.org/jira/browse/SPARK-20356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972544#comment-15972544 ] Ed Lee commented on SPARK-20356: really quite dangerous bug > Spark sql group by returns incorrect results after join + distinct > transformations > -- > > Key: SPARK-20356 > URL: https://issues.apache.org/jira/browse/SPARK-20356 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 > Environment: Linux mint 18 > Python 3.5 >Reporter: Chris Kipers > > I'm experiencing a bug with the head version of spark as of 4/17/2017. After > joining to dataframes, renaming a column and invoking distinct, the results > of the aggregation is incorrect after caching the dataframe. The following > code snippet consistently reproduces the error. > from pyspark.sql import SparkSession > import pyspark.sql.functions as sf > import pandas as pd > spark = SparkSession.builder.master("local").appName("Word > Count").getOrCreate() > mapping_sdf = spark.createDataFrame(pd.DataFrame([ > {"ITEM": "a", "GROUP": 1}, > {"ITEM": "b", "GROUP": 1}, > {"ITEM": "c", "GROUP": 2} > ])) > items_sdf = spark.createDataFrame(pd.DataFrame([ > {"ITEM": "a", "ID": 1}, > {"ITEM": "b", "ID": 2}, > {"ITEM": "c", "ID": 3} > ])) > mapped_sdf = \ > items_sdf.join(mapping_sdf, on='ITEM').select("ID", > sf.col("GROUP").alias('ITEM')).distinct() > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct > mapped_sdf.cache() > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 3, incorrect > The next code snippet is almost the same after the first except I don't call > distinct on the dataframe. This snippet performs as expected: > mapped_sdf = \ > items_sdf.join(mapping_sdf, on='ITEM').select("ID", > sf.col("GROUP").alias('ITEM')) > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct > mapped_sdf.cache() > print(mapped_sdf.groupBy("ITEM").count().count()) # Prints 2, correct > I don't experience this bug with spark 2.1 or event earlier versions for 2.2 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20343) SBT master build for Hadoop 2.6 in Jenkins fails due to Avro version resolution
[ https://issues.apache.org/jira/browse/SPARK-20343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972490#comment-15972490 ] Apache Spark commented on SPARK-20343: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/17669 > SBT master build for Hadoop 2.6 in Jenkins fails due to Avro version > resolution > > > Key: SPARK-20343 > URL: https://issues.apache.org/jira/browse/SPARK-20343 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.2.0 >Reporter: Hyukjin Kwon >Assignee: Hyukjin Kwon > Fix For: 2.2.0 > > > Please refer https://github.com/apache/spark/pull/17477#issuecomment-293942637 > {quote} > [error] > /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala:123: > value createDatumWriter is not a member of > org.apache.avro.generic.GenericData > [error] writerCache.getOrElseUpdate(schema, > GenericData.get.createDatumWriter(schema)) > [error] > {quote} > https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/2770/consoleFull > It seems sbt has a different resolution for Avro differently with Maven in > some cases. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-1548) Add Partial Random Forest algorithm to MLlib
[ https://issues.apache.org/jira/browse/SPARK-1548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972474#comment-15972474 ] Mohamed Baddar commented on SPARK-1548: --- [~srowen] [~josephkb] any updates on the possibility of proceeding with this issue ? > Add Partial Random Forest algorithm to MLlib > > > Key: SPARK-1548 > URL: https://issues.apache.org/jira/browse/SPARK-1548 > Project: Spark > Issue Type: New Feature > Components: MLlib >Affects Versions: 1.0.0 >Reporter: Manish Amde > > This task involves creating an alternate approximate random forest > implementation where each tree is constructed per partition. > The tasks involves: > - Justifying with theory and experimental results why this algorithm is a > good choice. > - Comparing the various tradeoffs and finalizing the algorithm before > implementation > - Code implementation > - Unit tests > - Functional tests > - Performance tests > - Documentation -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20344) Duplicate call in FairSchedulableBuilder.addTaskSetManager
[ https://issues.apache.org/jira/browse/SPARK-20344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-20344: - Assignee: Robert Stupp > Duplicate call in FairSchedulableBuilder.addTaskSetManager > -- > > Key: SPARK-20344 > URL: https://issues.apache.org/jira/browse/SPARK-20344 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Robert Stupp >Assignee: Robert Stupp >Priority: Trivial > Fix For: 2.2.0 > > > {{org.apache.spark.scheduler.FairSchedulableBuilder#addTaskSetManager}} > contains the code snippet: > {code} > override def addTaskSetManager(manager: Schedulable, properties: > Properties) { > var poolName = DEFAULT_POOL_NAME > var parentPool = rootPool.getSchedulableByName(poolName) > if (properties != null) { > poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, > DEFAULT_POOL_NAME) > parentPool = rootPool.getSchedulableByName(poolName) > if (parentPool == null) { > {code} > {{parentPool = rootPool.getSchedulableByName(poolName)}} is called twice if > {{properties != null}}. > I'm not sure whether this is an oversight or there's something else missing. > This piece of the code hasn't been modified since 2013, so I doubt that this > is a serious issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20344) Duplicate call in FairSchedulableBuilder.addTaskSetManager
[ https://issues.apache.org/jira/browse/SPARK-20344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20344. --- Resolution: Fixed Fix Version/s: 2.2.0 Issue resolved by pull request 17647 [https://github.com/apache/spark/pull/17647] > Duplicate call in FairSchedulableBuilder.addTaskSetManager > -- > > Key: SPARK-20344 > URL: https://issues.apache.org/jira/browse/SPARK-20344 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Robert Stupp >Priority: Trivial > Fix For: 2.2.0 > > > {{org.apache.spark.scheduler.FairSchedulableBuilder#addTaskSetManager}} > contains the code snippet: > {code} > override def addTaskSetManager(manager: Schedulable, properties: > Properties) { > var poolName = DEFAULT_POOL_NAME > var parentPool = rootPool.getSchedulableByName(poolName) > if (properties != null) { > poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, > DEFAULT_POOL_NAME) > parentPool = rootPool.getSchedulableByName(poolName) > if (parentPool == null) { > {code} > {{parentPool = rootPool.getSchedulableByName(poolName)}} is called twice if > {{properties != null}}. > I'm not sure whether this is an oversight or there's something else missing. > This piece of the code hasn't been modified since 2013, so I doubt that this > is a serious issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20361) JVM locale affects SQL type names
[ https://issues.apache.org/jira/browse/SPARK-20361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20361. --- Resolution: Duplicate > JVM locale affects SQL type names > -- > > Key: SPARK-20361 > URL: https://issues.apache.org/jira/browse/SPARK-20361 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.1.0 >Reporter: Maciej Szymkiewicz > > Steps to reproduce: > {code} > from pyspark.sql.types import IntegerType > locale = sc._jvm.java.util.Locale > locale.setDefault(locale.forLanguageTag("tr-TR")) > spark.createDataFrame([1, 2, 3], IntegerType()) > {code} > {code} > Py4JJavaError: An error occurred while calling o24.applySchemaToPythonRDD. > : java.util.NoSuchElementException: key not found: integer > at scala.collection.MapLike$class.default(MapLike.scala:228) > ... > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-20361) JVM locale affects SQL type names
[ https://issues.apache.org/jira/browse/SPARK-20361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reopened SPARK-20361: --- > JVM locale affects SQL type names > -- > > Key: SPARK-20361 > URL: https://issues.apache.org/jira/browse/SPARK-20361 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Affects Versions: 2.1.0 >Reporter: Maciej Szymkiewicz > > Steps to reproduce: > {code} > from pyspark.sql.types import IntegerType > locale = sc._jvm.java.util.Locale > locale.setDefault(locale.forLanguageTag("tr-TR")) > spark.createDataFrame([1, 2, 3], IntegerType()) > {code} > {code} > Py4JJavaError: An error occurred while calling o24.applySchemaToPythonRDD. > : java.util.NoSuchElementException: key not found: integer > at scala.collection.MapLike$class.default(MapLike.scala:228) > ... > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20364) Parquet predicate pushdown on columns with dots return empty results
[ https://issues.apache.org/jira/browse/SPARK-20364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972400#comment-15972400 ] Robert Kruszewski commented on SPARK-20364: --- Looks like parquet doesn't differentiate between column named "a.b" and field b in column "a". We could always treat them the same but that might lead to some surprises down the road. Ideally parquet would let us construct filter given exact column name or dot separated accessors. I think what [~aash] proposes is roughly how we should fix it with caveat that eventually this should live in parquet imho. > Parquet predicate pushdown on columns with dots return empty results > > > Key: SPARK-20364 > URL: https://issues.apache.org/jira/browse/SPARK-20364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Hyukjin Kwon > > Currently, if there are dots in the column name, predicate pushdown seems > being failed in Parquet. > **With dots** > {code} > val path = "/tmp/abcde" > Seq(Some(1), None).toDF("col.dots").write.parquet(path) > spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() > {code} > {code} > ++ > |col.dots| > ++ > ++ > {code} > **Without dots** > {code} > val path = "/tmp/abcde2" > Seq(Some(1), None).toDF("coldots").write.parquet(path) > spark.read.parquet(path).where("`coldots` IS NOT NULL").show() > {code} > {code} > +---+ > |coldots| > +---+ > | 1| > +---+ > {code} > It seems dot in the column names via {{FilterApi}} tries to separate the > field name with dot ({{ColumnPath}} with multiple column paths) whereas the > actual column name is {{col.dots}}. (See [FilterApi.java#L71 > |https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java#L71] > and it calls > [ColumnPath.java#L44|https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java#L44]. > I just tried to come up with ways to resolve it and I came up with two as > below: > One is simply to don't push down filters when there are dots in column names > so that it reads all and filters in Spark-side. > The other way creates Spark's {{FilterApi}} for those columns (it seems > final) to get always use single column path it in Spark-side (this seems > hacky) as we are not pushing down nested columns currently. So, it looks we > can get a field name via {{ColumnPath.get}} not {{ColumnPath.fromDotString}} > in this way. > I just made a rough version of the latter. > {code} > private[parquet] object ParquetColumns { > def intColumn(columnPath: String): Column[Integer] with SupportsLtGt = { > new Column[Integer] (ColumnPath.get(columnPath), classOf[Integer]) with > SupportsLtGt > } > def longColumn(columnPath: String): Column[java.lang.Long] with > SupportsLtGt = { > new Column[java.lang.Long] ( > ColumnPath.get(columnPath), classOf[java.lang.Long]) with SupportsLtGt > } > def floatColumn(columnPath: String): Column[java.lang.Float] with > SupportsLtGt = { > new Column[java.lang.Float] ( > ColumnPath.get(columnPath), classOf[java.lang.Float]) with SupportsLtGt > } > def doubleColumn(columnPath: String): Column[java.lang.Double] with > SupportsLtGt = { > new Column[java.lang.Double] ( > ColumnPath.get(columnPath), classOf[java.lang.Double]) with SupportsLtGt > } > def booleanColumn(columnPath: String): Column[java.lang.Boolean] with > SupportsEqNotEq = { > new Column[java.lang.Boolean] ( > ColumnPath.get(columnPath), classOf[java.lang.Boolean]) with > SupportsEqNotEq > } > def binaryColumn(columnPath: String): Column[Binary] with SupportsLtGt = { > new Column[Binary] (ColumnPath.get(columnPath), classOf[Binary]) with > SupportsLtGt > } > } > {code} > Both sound not the best. Please let me know if anyone has a better idea. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql
[ https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972401#comment-15972401 ] Takeshi Yamamuro commented on SPARK-20169: -- I also could reproduce this on /bin/pyspark in v2.1 and master. But, I rewrote this query with Scala but I couldn't do this. Does it seem to be a pyspark-only issue? {code} scala> val e = spark.read.option("header", true).csv("graph.csv").toDF("src", "dst") scala> val r = Seq((1), (2), (3), (4)).toDF("src") scala> val r1 = e.join(r, "src" :: Nil).groupBy("dst").count().withColumnRenamed("dst","src") scala> val jr = e.join(r1, "src" :: Nil) scala> val r2 = jr.groupBy("dst").count scala> jr.show +---+---+-+ |src|dst|count| +---+---+-+ | 1| 2|3| | 1| 3|3| | 1| 4|3| | 2| 1|1| | 3| 1|1| | 4| 1|1| +---+---+-+ scala> r2.show +---+-+ |dst|count| +---+-+ | 3|1| | 1|3| | 4|1| | 2|1| +---+-+ {code} > Groupby Bug with Sparksql > - > > Key: SPARK-20169 > URL: https://issues.apache.org/jira/browse/SPARK-20169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 >Reporter: Bin Wu > > We find a potential bug in Catalyst optimizer which cannot correctly > process "groupby". You can reproduce it by following simple example: > = > from pyspark.sql.functions import * > #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) > e = spark.read.csv("graph.csv", header=True) > r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) > r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') > jr = e.join(r1, 'src') > jr.show() > r2 = jr.groupBy('dst').count() > r2.show() > = > FYI, "graph.csv" contains exactly the same data as the commented line. > You can find that jr is: > |src|dst|count| > | 3| 1|1| > | 1| 4|3| > | 1| 3|3| > | 1| 2|3| > | 4| 1|1| > | 2| 1|1| > But, after the last groupBy, the 3 rows with dst = 1 are not grouped together: > |dst|count| > | 1|1| > | 4|1| > | 3|1| > | 2|1| > | 1|1| > | 1|1| > If we build jr directly from raw data (commented line), this error will not > show up. So > we suspect that there is a bug in the Catalyst optimizer when multiple joins > and groupBy's > are being optimized. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20364) Parquet predicate pushdown on columns with dots return empty results
[ https://issues.apache.org/jira/browse/SPARK-20364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972400#comment-15972400 ] Robert Kruszewski edited comment on SPARK-20364 at 4/18/17 9:36 AM: Looks like parquet doesn't differentiate between column named "a.b" and field "b" in column "a". We could always treat them the same but that might lead to some surprises down the road. Ideally parquet would let us construct filter given exact column name or dot separated accessors. I think what [~aash] proposes is roughly how we should fix it with caveat that eventually this should live in parquet imho. was (Author: robert3005): Looks like parquet doesn't differentiate between column named "a.b" and field b in column "a". We could always treat them the same but that might lead to some surprises down the road. Ideally parquet would let us construct filter given exact column name or dot separated accessors. I think what [~aash] proposes is roughly how we should fix it with caveat that eventually this should live in parquet imho. > Parquet predicate pushdown on columns with dots return empty results > > > Key: SPARK-20364 > URL: https://issues.apache.org/jira/browse/SPARK-20364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Hyukjin Kwon > > Currently, if there are dots in the column name, predicate pushdown seems > being failed in Parquet. > **With dots** > {code} > val path = "/tmp/abcde" > Seq(Some(1), None).toDF("col.dots").write.parquet(path) > spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() > {code} > {code} > ++ > |col.dots| > ++ > ++ > {code} > **Without dots** > {code} > val path = "/tmp/abcde2" > Seq(Some(1), None).toDF("coldots").write.parquet(path) > spark.read.parquet(path).where("`coldots` IS NOT NULL").show() > {code} > {code} > +---+ > |coldots| > +---+ > | 1| > +---+ > {code} > It seems dot in the column names via {{FilterApi}} tries to separate the > field name with dot ({{ColumnPath}} with multiple column paths) whereas the > actual column name is {{col.dots}}. (See [FilterApi.java#L71 > |https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java#L71] > and it calls > [ColumnPath.java#L44|https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java#L44]. > I just tried to come up with ways to resolve it and I came up with two as > below: > One is simply to don't push down filters when there are dots in column names > so that it reads all and filters in Spark-side. > The other way creates Spark's {{FilterApi}} for those columns (it seems > final) to get always use single column path it in Spark-side (this seems > hacky) as we are not pushing down nested columns currently. So, it looks we > can get a field name via {{ColumnPath.get}} not {{ColumnPath.fromDotString}} > in this way. > I just made a rough version of the latter. > {code} > private[parquet] object ParquetColumns { > def intColumn(columnPath: String): Column[Integer] with SupportsLtGt = { > new Column[Integer] (ColumnPath.get(columnPath), classOf[Integer]) with > SupportsLtGt > } > def longColumn(columnPath: String): Column[java.lang.Long] with > SupportsLtGt = { > new Column[java.lang.Long] ( > ColumnPath.get(columnPath), classOf[java.lang.Long]) with SupportsLtGt > } > def floatColumn(columnPath: String): Column[java.lang.Float] with > SupportsLtGt = { > new Column[java.lang.Float] ( > ColumnPath.get(columnPath), classOf[java.lang.Float]) with SupportsLtGt > } > def doubleColumn(columnPath: String): Column[java.lang.Double] with > SupportsLtGt = { > new Column[java.lang.Double] ( > ColumnPath.get(columnPath), classOf[java.lang.Double]) with SupportsLtGt > } > def booleanColumn(columnPath: String): Column[java.lang.Boolean] with > SupportsEqNotEq = { > new Column[java.lang.Boolean] ( > ColumnPath.get(columnPath), classOf[java.lang.Boolean]) with > SupportsEqNotEq > } > def binaryColumn(columnPath: String): Column[Binary] with SupportsLtGt = { > new Column[Binary] (ColumnPath.get(columnPath), classOf[Binary]) with > SupportsLtGt > } > } > {code} > Both sound not the best. Please let me know if anyone has a better idea. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20363) sessionstate.get is get the same object in hive project, when I use spark-beeline
[ https://issues.apache.org/jira/browse/SPARK-20363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20363. --- Resolution: Invalid This is very unclear. Add a comment if you can significantly clarify what this is about. > sessionstate.get is get the same object in hive project, when I use > spark-beeline > -- > > Key: SPARK-20363 > URL: https://issues.apache.org/jira/browse/SPARK-20363 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.2 >Reporter: QQShu1 > > sessionstate.get is get the same object in hive project, when I use > spark-beeline,but when I use hive beeline,sessionstate.get is get the > different object in hive project -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19995) Using real user to connect HiveMetastore in HiveClientImpl
[ https://issues.apache.org/jira/browse/SPARK-19995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972373#comment-15972373 ] meiyoula commented on SPARK-19995: -- Will the token be expired? > Using real user to connect HiveMetastore in HiveClientImpl > -- > > Key: SPARK-19995 > URL: https://issues.apache.org/jira/browse/SPARK-19995 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Assignee: Saisai Shao > Fix For: 2.1.1, 2.2.0 > > > If user specify "--proxy-user" in kerberized environment with Hive catalog > implementation, HiveClientImpl will try to connect hive metastore with > current user. While we use real user to do kinit, this will make connection > failure. We should change like what we did before in yarn code to use real > user. > {noformat} > ERROR TSaslTransport: SASL negotiation failure > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to find > any Kerberos tgt)] > at > com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) > at > org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271) > at > org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) > at > org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) > at > org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) > at > org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:420) > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:236) > at > org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132) > at > org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) > at > org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005) > at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024) > at > org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234) > at > org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174) > at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:166) > at > org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503) > at > org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:188) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:264) > at > org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:366) > at > org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:270) > at > org.apache.spark.sql.hive.HiveExternalCatalog.(HiveExternalCatalog.scala:65) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at >
[jira] [Comment Edited] (SPARK-20174) Analyzer gives mysterious AnalysisException when posexplode used in withColumn
[ https://issues.apache.org/jira/browse/SPARK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972359#comment-15972359 ] Takeshi Yamamuro edited comment on SPARK-20174 at 4/18/17 9:09 AM: --- You could fix this like https://github.com/apache/spark/compare/master...maropu:SPARK-20174. But, since we have workaround(df.withColumn("p" :: "c" :: Nil, posexplode($"a"))) and the addition of APIs in Dataset is much arguable, I think also the priority seems to be low. was (Author: maropu): You could fix this like https://github.com/apache/spark/compare/master...maropu:SPARK-20174. But, since we have workaround(df.withColumn("p" :: "c" :: Nil, posexplode($"a"))), I think also the priority seems to be low. > Analyzer gives mysterious AnalysisException when posexplode used in withColumn > -- > > Key: SPARK-20174 > URL: https://issues.apache.org/jira/browse/SPARK-20174 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Wish I knew how to even describe the issue. It appears that {{posexplode}} > cannot be used in {{withColumn}}, but the error message does not seem to say > it. > [The scaladoc of > posexplode|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@posexplode(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column] > is silent about this "limitation", too. > {code} > scala> codes.printSchema > root > |-- id: integer (nullable = false) > |-- rate_plan_code: array (nullable = true) > ||-- element: string (containsNull = true) > scala> codes.withColumn("code", posexplode($"rate_plan_code")).show > org.apache.spark.sql.AnalysisException: The number of aliases supplied in the > AS clause does not match the number of columns output by the UDTF expected 2 > aliases but got code ; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:90) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.makeGeneratorOutput(Analyzer.scala:1744) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1691) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1679) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1679) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1629) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70) > at >
[jira] [Commented] (SPARK-20174) Analyzer gives mysterious AnalysisException when posexplode used in withColumn
[ https://issues.apache.org/jira/browse/SPARK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972359#comment-15972359 ] Takeshi Yamamuro commented on SPARK-20174: -- You could fix this like https://github.com/apache/spark/compare/master...maropu:SPARK-20174. But, since we have workaround(df.withColumn("p" :: "c" :: Nil, posexplode($"a"))), I think also the priority seems to be low. > Analyzer gives mysterious AnalysisException when posexplode used in withColumn > -- > > Key: SPARK-20174 > URL: https://issues.apache.org/jira/browse/SPARK-20174 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Wish I knew how to even describe the issue. It appears that {{posexplode}} > cannot be used in {{withColumn}}, but the error message does not seem to say > it. > [The scaladoc of > posexplode|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@posexplode(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column] > is silent about this "limitation", too. > {code} > scala> codes.printSchema > root > |-- id: integer (nullable = false) > |-- rate_plan_code: array (nullable = true) > ||-- element: string (containsNull = true) > scala> codes.withColumn("code", posexplode($"rate_plan_code")).show > org.apache.spark.sql.AnalysisException: The number of aliases supplied in the > AS clause does not match the number of columns output by the UDTF expected 2 > aliases but got code ; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:90) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.makeGeneratorOutput(Analyzer.scala:1744) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1691) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1679) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1679) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1629) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:68) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:67) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2832) > at
[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql
[ https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972300#comment-15972300 ] Takeshi Yamamuro commented on SPARK-20169: -- oh, ... good work. > Groupby Bug with Sparksql > - > > Key: SPARK-20169 > URL: https://issues.apache.org/jira/browse/SPARK-20169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 >Reporter: Bin Wu > > We find a potential bug in Catalyst optimizer which cannot correctly > process "groupby". You can reproduce it by following simple example: > = > from pyspark.sql.functions import * > #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) > e = spark.read.csv("graph.csv", header=True) > r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) > r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') > jr = e.join(r1, 'src') > jr.show() > r2 = jr.groupBy('dst').count() > r2.show() > = > FYI, "graph.csv" contains exactly the same data as the commented line. > You can find that jr is: > |src|dst|count| > | 3| 1|1| > | 1| 4|3| > | 1| 3|3| > | 1| 2|3| > | 4| 1|1| > | 2| 1|1| > But, after the last groupBy, the 3 rows with dst = 1 are not grouped together: > |dst|count| > | 1|1| > | 4|1| > | 3|1| > | 2|1| > | 1|1| > | 1|1| > If we build jr directly from raw data (commented line), this error will not > show up. So > we suspect that there is a bug in the Catalyst optimizer when multiple joins > and groupBy's > are being optimized. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20169) Groupby Bug with Sparksql
[ https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972270#comment-15972270 ] Hyukjin Kwon edited comment on SPARK-20169 at 4/18/17 7:50 AM: --- Yea, I was confused too when I tried to reproduce this before. Probably, let me repharse the steps to reproduce as below: *source A* {code} from pyspark.sql.functions import * e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) e.show() {code} {code} +---+---+ |src|dst| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 1| | 3| 1| | 4| 1| +---+---+ {code} *source B* with a {{graph.csv}} file as below: {code} src,dst 1,2 1,3 1,4 2,1 3,1 4,1 {code} {code} from pyspark.sql.functions import * e = spark.read.csv("graph.csv", header=True) e.show() {code} {code} +---+---+ |src|dst| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 1| | 3| 1| | 4| 1| +---+---+ {code} *Reproducer* {code} r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') jr = e.join(r1, 'src') r2 = jr.groupBy('dst').count() r2.show() {code} *Results from source A and the reproducer* {code} +---+-+ |dst|count| +---+-+ | 1|3| | 3|1| | 2|1| | 4|1| +---+-+ {code} *Results from source B and the reproducer* {code} +---+-+ |dst|count| +---+-+ | 1|1| | 4|1| | 3|1| | 2|1| | 1|1| | 1|1| +---+-+ {code} was (Author: hyukjin.kwon): Yea, I was confused too when I tried to reproduce this before. Probably, let me repharse the steps to reproduce as below: *source A* with a {{graph.csv}} file as below: {code} src,dst 1,2 1,3 1,4 2,1 3,1 4,1 {code} {code} from pyspark.sql.functions import * e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) e.show() {code} {code} +---+---+ |src|dst| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 1| | 3| 1| | 4| 1| +---+---+ {code} *source B* {code} from pyspark.sql.functions import * e = spark.read.csv("graph.csv", header=True) e.show() {code} {code} +---+---+ |src|dst| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 1| | 3| 1| | 4| 1| +---+---+ {code} *Reproducer* {code} r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') jr = e.join(r1, 'src') r2 = jr.groupBy('dst').count() r2.show() {code} *Results from source A and the reproducer* {code} +---+-+ |dst|count| +---+-+ | 1|3| | 3|1| | 2|1| | 4|1| +---+-+ {code} *Results from source B and the reproducer* {code} +---+-+ |dst|count| +---+-+ | 1|1| | 4|1| | 3|1| | 2|1| | 1|1| | 1|1| +---+-+ {code} > Groupby Bug with Sparksql > - > > Key: SPARK-20169 > URL: https://issues.apache.org/jira/browse/SPARK-20169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 >Reporter: Bin Wu > > We find a potential bug in Catalyst optimizer which cannot correctly > process "groupby". You can reproduce it by following simple example: > = > from pyspark.sql.functions import * > #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) > e = spark.read.csv("graph.csv", header=True) > r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) > r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') > jr = e.join(r1, 'src') > jr.show() > r2 = jr.groupBy('dst').count() > r2.show() > = > FYI, "graph.csv" contains exactly the same data as the commented line. > You can find that jr is: > |src|dst|count| > | 3| 1|1| > | 1| 4|3| > | 1| 3|3| > | 1| 2|3| > | 4| 1|1| > | 2| 1|1| > But, after the last groupBy, the 3 rows with dst = 1 are not grouped together: > |dst|count| > | 1|1| > | 4|1| > | 3|1| > | 2|1| > | 1|1| > | 1|1| > If we build jr directly from raw data (commented line), this error will not > show up. So > we suspect that there is a bug in the Catalyst optimizer when multiple joins > and groupBy's > are being optimized. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20169) Groupby Bug with Sparksql
[ https://issues.apache.org/jira/browse/SPARK-20169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972270#comment-15972270 ] Hyukjin Kwon commented on SPARK-20169: -- Yea, I was confused too when I tried to reproduce this before. Probably, let me repharse the steps to reproduce as below: *source A* with a {{graph.csv}} file as below: {code} src,dst 1,2 1,3 1,4 2,1 3,1 4,1 {code} {code} from pyspark.sql.functions import * e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) e.show() {code} {code} +---+---+ |src|dst| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 1| | 3| 1| | 4| 1| +---+---+ {code} *source B* {code} from pyspark.sql.functions import * e = spark.read.csv("graph.csv", header=True) e.show() {code} {code} +---+---+ |src|dst| +---+---+ | 1| 2| | 1| 3| | 1| 4| | 2| 1| | 3| 1| | 4| 1| +---+---+ {code} *Reproducer* {code} r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') jr = e.join(r1, 'src') r2 = jr.groupBy('dst').count() r2.show() {code} *Results from source A and the reproducer* {code} +---+-+ |dst|count| +---+-+ | 1|3| | 3|1| | 2|1| | 4|1| +---+-+ {code} *Results from source B and the reproducer* {code} +---+-+ |dst|count| +---+-+ | 1|1| | 4|1| | 3|1| | 2|1| | 1|1| | 1|1| +---+-+ {code} > Groupby Bug with Sparksql > - > > Key: SPARK-20169 > URL: https://issues.apache.org/jira/browse/SPARK-20169 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.1.0 >Reporter: Bin Wu > > We find a potential bug in Catalyst optimizer which cannot correctly > process "groupby". You can reproduce it by following simple example: > = > from pyspark.sql.functions import * > #e=sc.parallelize([(1,2),(1,3),(1,4),(2,1),(3,1),(4,1)]).toDF(["src","dst"]) > e = spark.read.csv("graph.csv", header=True) > r = sc.parallelize([(1,),(2,),(3,),(4,)]).toDF(['src']) > r1 = e.join(r, 'src').groupBy('dst').count().withColumnRenamed('dst','src') > jr = e.join(r1, 'src') > jr.show() > r2 = jr.groupBy('dst').count() > r2.show() > = > FYI, "graph.csv" contains exactly the same data as the commented line. > You can find that jr is: > |src|dst|count| > | 3| 1|1| > | 1| 4|3| > | 1| 3|3| > | 1| 2|3| > | 4| 1|1| > | 2| 1|1| > But, after the last groupBy, the 3 rows with dst = 1 are not grouped together: > |dst|count| > | 1|1| > | 4|1| > | 3|1| > | 2|1| > | 1|1| > | 1|1| > If we build jr directly from raw data (commented line), this error will not > show up. So > we suspect that there is a bug in the Catalyst optimizer when multiple joins > and groupBy's > are being optimized. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenhua Wang updated SPARK-20366: - Description: If a plan has multi-level successive joins, e.g.: {noformat} Join / \ Union t5 / \ Join t4 /\ Join t3 / \ t1 t2 {noformat} Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, but not its child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children. was: If a plan has multi-level successive joins, e.g.: Join / \ Union t5 /\ Join t4 /\ Join t3 / \ t1 t2 Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, but not its child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children. > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > If a plan has multi-level successive joins, e.g.: > {noformat} > Join > / \ > Union t5 > / \ > Join t4 > /\ > Join t3 > / \ > t1 t2 > {noformat} > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, but not its child. This breaks the > transform procedure because `mapChildren` applies transform function on > parameters which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20286) dynamicAllocation.executorIdleTimeout is ignored after unpersist
[ https://issues.apache.org/jira/browse/SPARK-20286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972253#comment-15972253 ] Miguel Pérez commented on SPARK-20286: -- Thank you! I'll check it again and close the issue if I cannot reproduce the problem in 2.1.0. > dynamicAllocation.executorIdleTimeout is ignored after unpersist > > > Key: SPARK-20286 > URL: https://issues.apache.org/jira/browse/SPARK-20286 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.1 >Reporter: Miguel Pérez > > With dynamic allocation enabled, it seems that executors with cached data > which are unpersisted are still being killed using the > {{dynamicAllocation.cachedExecutorIdleTimeout}} configuration, instead of > {{dynamicAllocation.executorIdleTimeout}}. Assuming the default configuration > ({{dynamicAllocation.cachedExecutorIdleTimeout = Infinity}}), an executor > with unpersisted data won't be released until the job ends. > *How to reproduce* > - Set different values for {{dynamicAllocation.executorIdleTimeout}} and > {{dynamicAllocation.cachedExecutorIdleTimeout}} > - Load a file into a RDD and persist it > - Execute an action on the RDD (like a count) so some executors are activated. > - When the action has finished, unpersist the RDD > - The application UI removes correctly the persisted data from the *Storage* > tab, but if you look in the *Executors* tab, you will find that the executors > remain *active* until ({{dynamicAllocation.cachedExecutorIdleTimeout}} is > reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20320) AnalysisException: Columns of grouping_id (count(value#17L)) does not match grouping columns (count(value#17L))
[ https://issues.apache.org/jira/browse/SPARK-20320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972249#comment-15972249 ] Jacek Laskowski commented on SPARK-20320: - I'm playing with Spark SQL and multi-dimensional aggregates (like {{cube}}) and don't think much about whether a given query is semantically correct or not. I used {{count("id")}} to match what's in {{agg(grouping_id(count("value")))}} as otherwise I would have gotten the error you face. Your query is incorrect and to fix it is to add the grouping expression like {{count("id")}} (in your particular case). > AnalysisException: Columns of grouping_id (count(value#17L)) does not match > grouping columns (count(value#17L)) > --- > > Key: SPARK-20320 > URL: https://issues.apache.org/jira/browse/SPARK-20320 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > I'm not questioning the {{AnalysisException}} (which I don't know whether > should be reported or not), but the exception message that tells...nothing > helpful. > {code} > val records = spark.range(5).flatMap(n => Seq.fill(n.toInt)(n)) > scala> > records.cube(count("value")).agg(grouping_id(count("value"))).queryExecution.logical > org.apache.spark.sql.AnalysisException: Columns of grouping_id > (count(value#17L)) does not match grouping columns (count(value#17L)); > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGroupingAnalytics$$replaceGroupingFunc$1.applyOrElse(Analyzer.scala:313) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveGroupingAnalytics$$replaceGroupingFunc$1.applyOrElse(Analyzer.scala:308) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenhua Wang updated SPARK-20366: - Description: If a plan has multi-level successive joins, e.g.: Join / \ Union t5 /\ Join t4 /\ Join t3 / \ t1 t2 Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, but not its child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children. was: If a plan has multi-level successive joins, e.g.: ``` Join / \ Union t5 /\ Join t4 /\ Join t3 / \ t1 t2 ``` Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, but not its child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children. > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > If a plan has multi-level successive joins, e.g.: > Join > / \ > Union t5 > /\ > Join t4 > /\ > Join t3 > / \ > t1 t2 > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, but not its child. This breaks the > transform procedure because `mapChildren` applies transform function on > parameters which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenhua Wang updated SPARK-20366: - Description: If a plan has multi-level successive joins, e.g.: ``` Join / \ Union t5 /\ Join t4 /\ Join t3 / \ t1 t2 ``` Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, but not its child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children. was: If a plan has multi-level successive joins, e.g.: ``` Join / \ Union t5 /\ Join t4 /\ Join t3 / \ t1 t2 ``` Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, rather than a child. This breaks the transform procedure because `mapChildren` applies transform on parameters which should be children. > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > If a plan has multi-level successive joins, e.g.: > ``` > Join > / \ > Union t5 > /\ > Join t4 > /\ > Join t3 > / \ > t1 t2 > ``` > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, but not its child. This breaks the > transform procedure because `mapChildren` applies transform function on > parameters which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20366: Assignee: (was: Apache Spark) > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > If a plan has multi-level successive joins, e.g.: > ``` > Join > / \ > Union t5 > /\ > Join t4 > /\ > Join t3 > / \ > t1 t2 > ``` > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, rather than a child. This breaks the > transform procedure because `mapChildren` applies transform on parameters > which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972243#comment-15972243 ] Apache Spark commented on SPARK-20366: -- User 'wzhfy' has created a pull request for this issue: https://github.com/apache/spark/pull/17668 > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > If a plan has multi-level successive joins, e.g.: > ``` > Join > / \ > Union t5 > /\ > Join t4 > /\ > Join t3 > / \ > t1 t2 > ``` > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, rather than a child. This breaks the > transform procedure because `mapChildren` applies transform on parameters > which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
[ https://issues.apache.org/jira/browse/SPARK-20366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20366: Assignee: Apache Spark > Fix recursive join reordering: inside joins are not reordered > - > > Key: SPARK-20366 > URL: https://issues.apache.org/jira/browse/SPARK-20366 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang >Assignee: Apache Spark > > If a plan has multi-level successive joins, e.g.: > ``` > Join > / \ > Union t5 > /\ > Join t4 > /\ > Join t3 > / \ > t1 t2 > ``` > Currently we fail to reorder the inside joins, i.e. t1, t2, t3. > In join reorder, we use `OrderedJoin` to indicate a join has been ordered, > such that when transforming down the plan, these joins don't need to be > rerodered again. > But there's a problem in the definition of `OrderedJoin`: > The real join node is a parameter, rather than a child. This breaks the > transform procedure because `mapChildren` applies transform on parameters > which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20366) Fix recursive join reordering: inside joins are not reordered
Zhenhua Wang created SPARK-20366: Summary: Fix recursive join reordering: inside joins are not reordered Key: SPARK-20366 URL: https://issues.apache.org/jira/browse/SPARK-20366 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.2.0 Reporter: Zhenhua Wang If a plan has multi-level successive joins, e.g.: ``` Join / \ Union t5 /\ Join t4 /\ Join t3 / \ t1 t2 ``` Currently we fail to reorder the inside joins, i.e. t1, t2, t3. In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again. But there's a problem in the definition of `OrderedJoin`: The real join node is a parameter, rather than a child. This breaks the transform procedure because `mapChildren` applies transform on parameters which should be children. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20365) Not so accurate classpath format for AM and Containers
[ https://issues.apache.org/jira/browse/SPARK-20365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saisai Shao updated SPARK-20365: Summary: Not so accurate classpath format for AM and Containers (was: Inaccurate classpath format for AM and Containers) > Not so accurate classpath format for AM and Containers > -- > > Key: SPARK-20365 > URL: https://issues.apache.org/jira/browse/SPARK-20365 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.2.0 >Reporter: Saisai Shao >Priority: Minor > > In Spark on YARN, when configuring "spark.yarn.jars" with local jars (jars > started with "local" scheme), we will get inaccurate classpath for AM and > containers. This is because we don't remove "local" scheme when concatenating > classpath. It is OK to run because classpath is separated with ":" and java > treat "local" as a separate jar. But we could improve it to remove the scheme. > {code} > java.class.path = >
[jira] [Commented] (SPARK-20286) dynamicAllocation.executorIdleTimeout is ignored after unpersist
[ https://issues.apache.org/jira/browse/SPARK-20286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972231#comment-15972231 ] Umesh Chaudhary commented on SPARK-20286: - Yep, +1 to the UI changes. However, I tested the behaviour by following the steps mentioned by you and this seems to be fixed in 2.1.0. {code} spark2-shell --conf spark.dynamicAllocation.executorIdleTimeout=7s --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=20s --conf spark.dynamicAllocation.minExecutors=2 scala> sc.setLogLevel("INFO") scala> val rdd=sc.textFile("/tmp/config.txt") .. 17/04/17 23:46:58 INFO spark.SparkContext: Created broadcast 0 from textFile at :24 rdd: org.apache.spark.rdd.RDD[String] = /tmp/config.txt MapPartitionsRDD[1] at textFile at :24 .. .. scala> rdd.collect 17/04/17 23:47:10 INFO mapred.FileInputFormat: Total input paths to process : 1 17/04/17 23:47:10 INFO spark.SparkContext: Starting job: collect at :27 .. .. 17/04/17 23:47:13 INFO scheduler.DAGScheduler: Job 0 finished: collect at :27, took 3.457215 s .. scala> 17/04/17 23:47:20 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 1 {code} As expected executors remove request was received above after 7 secs. Then I tested behaviour for persist: {code} scala> rdd.persist res2: rdd.type = /tmp/config.txt MapPartitionsRDD[1] at textFile at :24 scala> rdd.count 17/04/17 23:47:45 INFO spark.SparkContext: Starting job: count at :27 .. 17/04/17 23:47:45 INFO scheduler.DAGScheduler: Job 1 finished: count at :27, took 0.293053 s .. scala> 17/04/17 23:48:05 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 1 {code} In this case also request for removing the executor was received after 20 secs. Afterwords I tested unpersist: {code} scala> rdd.unpersist(true) 17/04/17 23:50:22 INFO rdd.MapPartitionsRDD: Removing RDD 1 from persistence list 17/04/17 23:50:22 INFO storage.BlockManager: Removing RDD 1 res7: rdd.type = /tmp/config.txt MapPartitionsRDD[1] at textFile at :24 scala> rdd.count 17/04/17 23:50:31 INFO spark.SparkContext: Starting job: count at :27 .. 17/04/17 23:50:31 INFO scheduler.DAGScheduler: Job 3 finished: count at :27, took 0.219764 s res8: Long = 100 .. scala> 17/04/17 23:50:38 INFO spark.ExecutorAllocationManager: Request to remove executorIds: 1 {code} This time remove request was received after 7 secs. > dynamicAllocation.executorIdleTimeout is ignored after unpersist > > > Key: SPARK-20286 > URL: https://issues.apache.org/jira/browse/SPARK-20286 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.1 >Reporter: Miguel Pérez > > With dynamic allocation enabled, it seems that executors with cached data > which are unpersisted are still being killed using the > {{dynamicAllocation.cachedExecutorIdleTimeout}} configuration, instead of > {{dynamicAllocation.executorIdleTimeout}}. Assuming the default configuration > ({{dynamicAllocation.cachedExecutorIdleTimeout = Infinity}}), an executor > with unpersisted data won't be released until the job ends. > *How to reproduce* > - Set different values for {{dynamicAllocation.executorIdleTimeout}} and > {{dynamicAllocation.cachedExecutorIdleTimeout}} > - Load a file into a RDD and persist it > - Execute an action on the RDD (like a count) so some executors are activated. > - When the action has finished, unpersist the RDD > - The application UI removes correctly the persisted data from the *Storage* > tab, but if you look in the *Executors* tab, you will find that the executors > remain *active* until ({{dynamicAllocation.cachedExecutorIdleTimeout}} is > reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20365) Inaccurate classpath format for AM and Containers
Saisai Shao created SPARK-20365: --- Summary: Inaccurate classpath format for AM and Containers Key: SPARK-20365 URL: https://issues.apache.org/jira/browse/SPARK-20365 Project: Spark Issue Type: Bug Components: YARN Affects Versions: 2.2.0 Reporter: Saisai Shao Priority: Minor In Spark on YARN, when configuring "spark.yarn.jars" with local jars (jars started with "local" scheme), we will get inaccurate classpath for AM and containers. This is because we don't remove "local" scheme when concatenating classpath. It is OK to run because classpath is separated with ":" and java treat "local" as a separate jar. But we could improve it to remove the scheme. {code} java.class.path =
[jira] [Comment Edited] (SPARK-20174) Analyzer gives mysterious AnalysisException when posexplode used in withColumn
[ https://issues.apache.org/jira/browse/SPARK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972211#comment-15972211 ] Takeshi Yamamuro edited comment on SPARK-20174 at 4/18/17 6:54 AM: --- To fix this, IMHO it's okay to accept Seq[String] in the first argument of withColumn for multi aliases; {code} scala> val df = Seq((Seq(1, 2, 3))).toDF("a") scala> df.select(posexplode($"a").as("p" :: "c" :: Nil)).show +---+---+ | p| c| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ scala> df.withColumn("p" :: "c" :: Nil, posexplode($"a")).show :26: error: type mismatch; found : List[String] required: String df.withColumn("p" :: "c" :: Nil, posexplode($"a")).show {code} was (Author: maropu): To fix this, it seems to be okay to accept Seq[String] in the first argument of withColumn for multi aliases; {code} scala> val df = Seq((Seq(1, 2, 3))).toDF("a") scala> df.select(posexplode($"a").as("p" :: "c" :: Nil)).show +---+---+ | p| c| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ scala> df.withColumn("p" :: "c" :: Nil, posexplode($"a")).show :26: error: type mismatch; found : List[String] required: String df.withColumn("p" :: "c" :: Nil, posexplode($"a")).show {code} > Analyzer gives mysterious AnalysisException when posexplode used in withColumn > -- > > Key: SPARK-20174 > URL: https://issues.apache.org/jira/browse/SPARK-20174 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Wish I knew how to even describe the issue. It appears that {{posexplode}} > cannot be used in {{withColumn}}, but the error message does not seem to say > it. > [The scaladoc of > posexplode|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@posexplode(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column] > is silent about this "limitation", too. > {code} > scala> codes.printSchema > root > |-- id: integer (nullable = false) > |-- rate_plan_code: array (nullable = true) > ||-- element: string (containsNull = true) > scala> codes.withColumn("code", posexplode($"rate_plan_code")).show > org.apache.spark.sql.AnalysisException: The number of aliases supplied in the > AS clause does not match the number of columns output by the UDTF expected 2 > aliases but got code ; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:90) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.makeGeneratorOutput(Analyzer.scala:1744) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1691) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1679) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1679) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1629) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) >
[jira] [Commented] (SPARK-20174) Analyzer gives mysterious AnalysisException when posexplode used in withColumn
[ https://issues.apache.org/jira/browse/SPARK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972211#comment-15972211 ] Takeshi Yamamuro commented on SPARK-20174: -- To fix this, it seems to be okay to accept Seq[String] in the first argument of withColumn for multi aliases; {code} scala> val df = Seq((Seq(1, 2, 3))).toDF("a") scala> df.select(posexplode($"a").as("p" :: "c" :: Nil)).show +---+---+ | p| c| +---+---+ | 0| 1| | 1| 2| | 2| 3| +---+---+ scala> df.withColumn("p" :: "c" :: Nil, posexplode($"a")).show :26: error: type mismatch; found : List[String] required: String df.withColumn("p" :: "c" :: Nil, posexplode($"a")).show {code} > Analyzer gives mysterious AnalysisException when posexplode used in withColumn > -- > > Key: SPARK-20174 > URL: https://issues.apache.org/jira/browse/SPARK-20174 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > Wish I knew how to even describe the issue. It appears that {{posexplode}} > cannot be used in {{withColumn}}, but the error message does not seem to say > it. > [The scaladoc of > posexplode|http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@posexplode(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column] > is silent about this "limitation", too. > {code} > scala> codes.printSchema > root > |-- id: integer (nullable = false) > |-- rate_plan_code: array (nullable = true) > ||-- element: string (containsNull = true) > scala> codes.withColumn("code", posexplode($"rate_plan_code")).show > org.apache.spark.sql.AnalysisException: The number of aliases supplied in the > AS clause does not match the number of columns output by the UDTF expected 2 > aliases but got code ; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:90) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate$.makeGeneratorOutput(Analyzer.scala:1744) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1691) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20$$anonfun$56.apply(Analyzer.scala:1679) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1679) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$$anonfun$apply$20.applyOrElse(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1664) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator$.apply(Analyzer.scala:1629) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:70) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:68) > at >
[jira] [Commented] (SPARK-20364) Parquet predicate pushdown on columns with dots return empty results
[ https://issues.apache.org/jira/browse/SPARK-20364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972171#comment-15972171 ] Hyukjin Kwon commented on SPARK-20364: -- [~aash], [~robert3005] who found this issue in https://github.com/apache/spark/pull/17667 and [~lian cheng] who might have a better idea and I think can confirm if the investigation here is correct and decide the way to resolve it. > Parquet predicate pushdown on columns with dots return empty results > > > Key: SPARK-20364 > URL: https://issues.apache.org/jira/browse/SPARK-20364 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Hyukjin Kwon > > Currently, if there are dots in the column name, predicate pushdown seems > being failed in Parquet. > **With dots** > {code} > val path = "/tmp/abcde" > Seq(Some(1), None).toDF("col.dots").write.parquet(path) > spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() > {code} > {code} > ++ > |col.dots| > ++ > ++ > {code} > **Without dots** > {code} > val path = "/tmp/abcde2" > Seq(Some(1), None).toDF("coldots").write.parquet(path) > spark.read.parquet(path).where("`coldots` IS NOT NULL").show() > {code} > {code} > +---+ > |coldots| > +---+ > | 1| > +---+ > {code} > It seems dot in the column names via {{FilterApi}} tries to separate the > field name with dot ({{ColumnPath}} with multiple column paths) whereas the > actual column name is {{col.dots}}. (See [FilterApi.java#L71 > |https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java#L71] > and it calls > [ColumnPath.java#L44|https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java#L44]. > I just tried to come up with ways to resolve it and I came up with two as > below: > One is simply to don't push down filters when there are dots in column names > so that it reads all and filters in Spark-side. > The other way creates Spark's {{FilterApi}} for those columns (it seems > final) to get always use single column path it in Spark-side (this seems > hacky) as we are not pushing down nested columns currently. So, it looks we > can get a field name via {{ColumnPath.get}} not {{ColumnPath.fromDotString}} > in this way. > I just made a rough version of the latter. > {code} > private[parquet] object ParquetColumns { > def intColumn(columnPath: String): Column[Integer] with SupportsLtGt = { > new Column[Integer] (ColumnPath.get(columnPath), classOf[Integer]) with > SupportsLtGt > } > def longColumn(columnPath: String): Column[java.lang.Long] with > SupportsLtGt = { > new Column[java.lang.Long] ( > ColumnPath.get(columnPath), classOf[java.lang.Long]) with SupportsLtGt > } > def floatColumn(columnPath: String): Column[java.lang.Float] with > SupportsLtGt = { > new Column[java.lang.Float] ( > ColumnPath.get(columnPath), classOf[java.lang.Float]) with SupportsLtGt > } > def doubleColumn(columnPath: String): Column[java.lang.Double] with > SupportsLtGt = { > new Column[java.lang.Double] ( > ColumnPath.get(columnPath), classOf[java.lang.Double]) with SupportsLtGt > } > def booleanColumn(columnPath: String): Column[java.lang.Boolean] with > SupportsEqNotEq = { > new Column[java.lang.Boolean] ( > ColumnPath.get(columnPath), classOf[java.lang.Boolean]) with > SupportsEqNotEq > } > def binaryColumn(columnPath: String): Column[Binary] with SupportsLtGt = { > new Column[Binary] (ColumnPath.get(columnPath), classOf[Binary]) with > SupportsLtGt > } > } > {code} > Both sound not the best. Please let me know if anyone has a better idea. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20364) Parquet predicate pushdown on columns with dots return empty results
Hyukjin Kwon created SPARK-20364: Summary: Parquet predicate pushdown on columns with dots return empty results Key: SPARK-20364 URL: https://issues.apache.org/jira/browse/SPARK-20364 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Hyukjin Kwon Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet. **With dots** {code} val path = "/tmp/abcde" Seq(Some(1), None).toDF("col.dots").write.parquet(path) spark.read.parquet(path).where("`col.dots` IS NOT NULL").show() {code} {code} ++ |col.dots| ++ ++ {code} **Without dots** {code} val path = "/tmp/abcde2" Seq(Some(1), None).toDF("coldots").write.parquet(path) spark.read.parquet(path).where("`coldots` IS NOT NULL").show() {code} {code} +---+ |coldots| +---+ | 1| +---+ {code} It seems dot in the column names via {{FilterApi}} tries to separate the field name with dot ({{ColumnPath}} with multiple column paths) whereas the actual column name is {{col.dots}}. (See [FilterApi.java#L71 |https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java#L71] and it calls [ColumnPath.java#L44|https://github.com/apache/parquet-mr/blob/apache-parquet-1.8.2/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/ColumnPath.java#L44]. I just tried to come up with ways to resolve it and I came up with two as below: One is simply to don't push down filters when there are dots in column names so that it reads all and filters in Spark-side. The other way creates Spark's {{FilterApi}} for those columns (it seems final) to get always use single column path it in Spark-side (this seems hacky) as we are not pushing down nested columns currently. So, it looks we can get a field name via {{ColumnPath.get}} not {{ColumnPath.fromDotString}} in this way. I just made a rough version of the latter. {code} private[parquet] object ParquetColumns { def intColumn(columnPath: String): Column[Integer] with SupportsLtGt = { new Column[Integer] (ColumnPath.get(columnPath), classOf[Integer]) with SupportsLtGt } def longColumn(columnPath: String): Column[java.lang.Long] with SupportsLtGt = { new Column[java.lang.Long] ( ColumnPath.get(columnPath), classOf[java.lang.Long]) with SupportsLtGt } def floatColumn(columnPath: String): Column[java.lang.Float] with SupportsLtGt = { new Column[java.lang.Float] ( ColumnPath.get(columnPath), classOf[java.lang.Float]) with SupportsLtGt } def doubleColumn(columnPath: String): Column[java.lang.Double] with SupportsLtGt = { new Column[java.lang.Double] ( ColumnPath.get(columnPath), classOf[java.lang.Double]) with SupportsLtGt } def booleanColumn(columnPath: String): Column[java.lang.Boolean] with SupportsEqNotEq = { new Column[java.lang.Boolean] ( ColumnPath.get(columnPath), classOf[java.lang.Boolean]) with SupportsEqNotEq } def binaryColumn(columnPath: String): Column[Binary] with SupportsLtGt = { new Column[Binary] (ColumnPath.get(columnPath), classOf[Binary]) with SupportsLtGt } } {code} Both sound not the best. Please let me know if anyone has a better idea. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org