[jira] [Commented] (SPARK-37980) Extend METADATA column to support row indices for file based data sources
[ https://issues.apache.org/jira/browse/SPARK-37980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485215#comment-17485215 ] Cheng Lian commented on SPARK-37980: [~prakharjain09], as you've mentioned, it's not super straightforward to customize the Parquet code paths in Spark to achieve the goal. In the meanwhile, this functionality is in general quite useful. I can imagine it enabling other systems in the Parquet ecosystem to build more sophisticated indexing solutions. Instead of doing heavy customizations in Spark, would it be better if we can make the changes happen in upstream {{parquet-mr}} so that other systems can benefit from it more easily? > Extend METADATA column to support row indices for file based data sources > - > > Key: SPARK-37980 > URL: https://issues.apache.org/jira/browse/SPARK-37980 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3 >Reporter: Prakhar Jain >Priority: Major > > Spark recently added hidden metadata column support for File based > datasources as part of SPARK-37273. > We should extend it to support ROW_INDEX/ROW_POSITION also. > > Meaning of ROW_POSITION: > ROW_INDEX/ROW_POSITION is basically an index of a row within a file. E.g. 5th > row in the file will have ROW_INDEX 5. > > Use cases: > Row Indexes can be used in a variety of ways. A (fileName, rowIndex) tuple > uniquely identifies row in a table. This information can be used to mark rows > e.g. this can be used by indexer etc. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31935) Hadoop file system config should be effective in data source options
[ https://issues.apache.org/jira/browse/SPARK-31935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-31935: --- Affects Version/s: (was: 3.0.1) (was: 3.1.0) 2.4.6 3.0.0 > Hadoop file system config should be effective in data source options > - > > Key: SPARK-31935 > URL: https://issues.apache.org/jira/browse/SPARK-31935 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.6, 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > Fix For: 3.0.1, 3.1.0 > > > Data source options should be propagated into the hadoop configuration of > method `checkAndGlobPathIfNecessary` > From org.apache.hadoop.fs.FileSystem.java: > {code:java} > public static FileSystem get(URI uri, Configuration conf) throws > IOException { > String scheme = uri.getScheme(); > String authority = uri.getAuthority(); > if (scheme == null && authority == null) { // use default FS > return get(conf); > } > if (scheme != null && authority == null) { // no authority > URI defaultUri = getDefaultUri(conf); > if (scheme.equals(defaultUri.getScheme())// if scheme matches > default > && defaultUri.getAuthority() != null) { // & default has authority > return get(defaultUri, conf); // return default > } > } > > String disableCacheName = String.format("fs.%s.impl.disable.cache", > scheme); > if (conf.getBoolean(disableCacheName, false)) { > return createFileSystem(uri, conf); > } > return CACHE.get(uri, conf); > } > {code} > With this, we can specify URI schema and authority related configurations for > scanning file systems. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26352) Join reordering should not change the order of output attributes
[ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-26352: --- Summary: Join reordering should not change the order of output attributes (was: join reordering should not change the order of output attributes) > Join reordering should not change the order of output attributes > > > Key: SPARK-26352 > URL: https://issues.apache.org/jira/browse/SPARK-26352 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 2.4.0 >Reporter: Kris Mok >Assignee: Kris Mok >Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} > performs join reordering on inner joins. This was introduced from SPARK-12032 > in 2015-12. > After it had reordered the joins, though, it didn't check whether or not the > column order (in terms of the {{output}} attribute list) is still the same as > before. Thus, it's possible to have a mismatch between the reordered column > order vs the schema that a DataFrame thinks it has. > This can be demonstrated with the example: > {code:none} > spark.sql("create table table_a (x int, y int) using parquet") > spark.sql("create table table_b (i int, j int) using parquet") > spark.sql("create table table_c (a int, b int) using parquet") > val df = spark.sql("with df1 as (select * from table_a cross join table_b) > select * from df1 join table_c on a = x and b = i") > {code} > here's what the DataFrame thinks: > {code:none} > scala> df.printSchema > root > |-- x: integer (nullable = true) > |-- y: integer (nullable = true) > |-- i: integer (nullable = true) > |-- j: integer (nullable = true) > |-- a: integer (nullable = true) > |-- b: integer (nullable = true) > {code} > here's what the optimized plan thinks, after join reordering: > {code:none} > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- a: integer > |-- b: integer > |-- i: integer > |-- j: integer > {code} > If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule > exclusion feature), it's back to normal: > {code:none} > scala> spark.conf.set("spark.sql.optimizer.excludedRules", > "org.apache.spark.sql.catalyst.optimizer.ReorderJoin") > scala> val df = spark.sql("with df1 as (select * from table_a cross join > table_b) select * from df1 join table_c on a = x and b = i") > df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields] > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- i: integer > |-- j: integer > |-- a: integer > |-- b: integer > {code} > Note that this column ordering problem leads to data corruption, and can > manifest itself in various symptoms: > * Silently corrupting data, if the reordered columns happen to either have > matching types or have sufficiently-compatible types (e.g. all fixed length > primitive types are considered as "sufficiently compatible" in an UnsafeRow), > then only the resulting data is going to be wrong but it might not trigger > any alarms immediately. Or > * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, > or even SIGSEGVs. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-29667) implicitly convert mismatched datatypes on right side of "IN" operator
[ https://issues.apache.org/jira/browse/SPARK-29667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-29667: --- Environment: (was: spark-2.4.3-bin-dbr-5.5-snapshot-9833d0f) > implicitly convert mismatched datatypes on right side of "IN" operator > -- > > Key: SPARK-29667 > URL: https://issues.apache.org/jira/browse/SPARK-29667 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.3 >Reporter: Jessie Lin >Priority: Minor > > Ran into error on this sql > Mismatched columns: > [(a.`id`:decimal(28,0), db1.table1.`id`:decimal(18,0))] > the sql and clause > AND a.id in (select id from db1.table1 where col1 = 1 group by id) > Once I cast decimal(18,0) to decimal(28,0) explicitly above, the sql ran just > fine. Can the sql engine cast implicitly in this case? > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29667) implicitly convert mismatched datatypes on right side of "IN" operator
[ https://issues.apache.org/jira/browse/SPARK-29667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16963305#comment-16963305 ] Cheng Lian commented on SPARK-29667: Reproduced this with the following snippet: {code} spark.range(10).select($"id" cast DecimalType(18, 0)).createOrReplaceTempView("t1") spark.range(10).select($"id" cast DecimalType(28, 0)).createOrReplaceTempView("t2") sql("SELECT * FROM t1 WHERE t1.id IN (SELECT id FROM t2)").explain(true) {code} Exception: {noformat} The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery Mismatched columns: [(t1.`id`:decimal(18,0), t2.`id`:decimal(28,0))] Left side: [decimal(18,0)]. Right side: [decimal(28,0)].; line 1 pos 29; 'Project [*] +- 'Filter id#16 IN (list#22 []) : +- Project [id#20] : +- SubqueryAlias `t2` :+- Project [cast(id#18L as decimal(28,0)) AS id#20] : +- Range (0, 10, step=1, splits=Some(8)) +- SubqueryAlias `t1` +- Project [cast(id#14L as decimal(18,0)) AS id#16] +- Range (0, 10, step=1, splits=Some(8)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:123) ... {noformat} It seems that Postgres does support this kind of implicit casting: {noformat} postgres=# SELECT CAST(1 AS BIGINT) IN (CAST(1 AS INT)); ?column? -- t (1 row) {noformat} I believe the problem in Spark is that {{o.a.s.s.c.expressions.In#checkInputDataTypes()}} is too strict. > implicitly convert mismatched datatypes on right side of "IN" operator > -- > > Key: SPARK-29667 > URL: https://issues.apache.org/jira/browse/SPARK-29667 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.3 > Environment: spark-2.4.3-bin-dbr-5.5-snapshot-9833d0f >Reporter: Jessie Lin >Priority: Minor > > Ran into error on this sql > Mismatched columns: > [(a.`id`:decimal(28,0), db1.table1.`id`:decimal(18,0))] > the sql and clause > AND a.id in (select id from db1.table1 where col1 = 1 group by id) > Once I cast decimal(18,0) to decimal(28,0) explicitly above, the sql ran just > fine. Can the sql engine cast implicitly in this case? > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26806) EventTimeStats.merge doesn't handle "zero.merge(zero)" correctly
[ https://issues.apache.org/jira/browse/SPARK-26806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-26806: --- Reporter: Cheng Lian (was: liancheng) > EventTimeStats.merge doesn't handle "zero.merge(zero)" correctly > > > Key: SPARK-26806 > URL: https://issues.apache.org/jira/browse/SPARK-26806 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1, 2.2.2, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0 >Reporter: Cheng Lian >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.2.4, 2.3.3, 2.4.1, 3.0.0 > > > Right now, EventTimeStats.merge doesn't handle "zero.merge(zero)". This will > make "avg" become "NaN". And whatever gets merged with the result of > "zero.merge(zero)", "avg" will still be "NaN". Then finally, "NaN".toLong > will return "0" and the user will see the following incorrect report: > {code} > "eventTime" : { > "avg" : "1970-01-01T00:00:00.000Z", > "max" : "2019-01-31T12:57:00.000Z", > "min" : "2019-01-30T18:44:04.000Z", > "watermark" : "1970-01-01T00:00:00.000Z" > } > {code} > This issue was reported by [~liancheng] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26806) EventTimeStats.merge doesn't handle "zero.merge(zero)" correctly
[ https://issues.apache.org/jira/browse/SPARK-26806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-26806: --- Description: Right now, EventTimeStats.merge doesn't handle "zero.merge(zero)". This will make "avg" become "NaN". And whatever gets merged with the result of "zero.merge(zero)", "avg" will still be "NaN". Then finally, "NaN".toLong will return "0" and the user will see the following incorrect report: {code:java} "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } {code} was: Right now, EventTimeStats.merge doesn't handle "zero.merge(zero)". This will make "avg" become "NaN". And whatever gets merged with the result of "zero.merge(zero)", "avg" will still be "NaN". Then finally, "NaN".toLong will return "0" and the user will see the following incorrect report: {code} "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } {code} This issue was reported by [~liancheng] > EventTimeStats.merge doesn't handle "zero.merge(zero)" correctly > > > Key: SPARK-26806 > URL: https://issues.apache.org/jira/browse/SPARK-26806 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1, 2.2.2, 2.2.3, 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.4.0 >Reporter: Cheng Lian >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.2.4, 2.3.3, 2.4.1, 3.0.0 > > > Right now, EventTimeStats.merge doesn't handle "zero.merge(zero)". This will > make "avg" become "NaN". And whatever gets merged with the result of > "zero.merge(zero)", "avg" will still be "NaN". Then finally, "NaN".toLong > will return "0" and the user will see the following incorrect report: > {code:java} > "eventTime" : { > "avg" : "1970-01-01T00:00:00.000Z", > "max" : "2019-01-31T12:57:00.000Z", > "min" : "2019-01-30T18:44:04.000Z", > "watermark" : "1970-01-01T00:00:00.000Z" > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27369) Standalone worker can load resource conf and discover resources
[ https://issues.apache.org/jira/browse/SPARK-27369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-27369: -- Assignee: wuyi > Standalone worker can load resource conf and discover resources > --- > > Key: SPARK-27369 > URL: https://issues.apache.org/jira/browse/SPARK-27369 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: wuyi >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-27611) Redundant javax.activation dependencies in the Maven build
[ https://issues.apache.org/jira/browse/SPARK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-27611: -- Assignee: Cheng Lian > Redundant javax.activation dependencies in the Maven build > -- > > Key: SPARK-27611 > URL: https://issues.apache.org/jira/browse/SPARK-27611 > Project: Spark > Issue Type: Dependency upgrade > Components: Build >Affects Versions: 3.0.0 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Minor > > [PR #23890|https://github.com/apache/spark/pull/23890] introduced > {{org.glassfish.jaxb:jaxb-runtime:2.3.2}} as a runtime dependency. As an > unexpected side effect, {{jakarta.activation:jakarta.activation-api:1.2.1}} > was also pulled in as a transitive dependency. As a result, for the Maven > build, both of the following two jars can be found under > {{assembly/target/scala-2.12/jars}}: > {noformat} > activation-1.1.1.jar > jakarta.activation-api-1.2.1.jar > {noformat} > Discussed this with [~srowen] offline and we agreed that we should probably > exclude the Jakarta one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-27611) Redundant javax.activation dependencies in the Maven build
Cheng Lian created SPARK-27611: -- Summary: Redundant javax.activation dependencies in the Maven build Key: SPARK-27611 URL: https://issues.apache.org/jira/browse/SPARK-27611 Project: Spark Issue Type: Dependency upgrade Components: Build Affects Versions: 3.0.0 Reporter: Cheng Lian [PR #23890|https://github.com/apache/spark/pull/23890] introduced {{org.glassfish.jaxb:jaxb-runtime:2.3.2}} as a runtime dependency. As an unexpected side effect, {{jakarta.activation:jakarta.activation-api:1.2.1}} was also pulled in as a transitive dependency. As a result, for the Maven build, both of the following two jars can be found under {{assembly/target/scala-2.12/jars}}: {noformat} activation-1.1.1.jar jakarta.activation-api-1.2.1.jar {noformat} Discussed this with [~srowen] offline and we agreed that we should probably exclude the Jakarta one. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25966) "EOF Reached the end of stream with bytes left to read" while reading/writing to Parquets
[ https://issues.apache.org/jira/browse/SPARK-25966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678595#comment-16678595 ] Cheng Lian commented on SPARK-25966: [~andrioni], just realized that I might misunderstand this part of your statement: {quote} This job used to work fine with Spark 2.2.1 [...] {quote} I thought you could read the same problematic files using Spark 2.2.1. Now I guess you probably only meant that the same job worked fine with Spark 2.2.1 previously (with different sets of historical files). > "EOF Reached the end of stream with bytes left to read" while reading/writing > to Parquets > - > > Key: SPARK-25966 > URL: https://issues.apache.org/jira/browse/SPARK-25966 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 (built from RC5 tag) running Hadoop 3.1.1 on > top of a Mesos cluster. Both input and output Parquet files are on S3. >Reporter: Alessandro Andrioni >Priority: Major > > I was persistently getting the following exception while trying to run one > Spark job we have using Spark 2.4.0. It went away after I regenerated from > scratch all the input Parquet files (generated by another Spark job also > using Spark 2.4.0). > Is there a chance that Spark is writing (quite rarely) corrupted Parquet > files? > {code:java} > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228) > at > org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557) > (...) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 312 in stage 682.0 failed 4 times, most recent failure: Lost task 312.3 > in stage 682.0 (TID 235229, 10.130.29.78, executor 77): java.io.EOFException: > Reached the end of stream with 996 bytes left to read > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104) > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127) > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91) > at > org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1174) > at > org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301) >
[jira] [Comment Edited] (SPARK-25966) "EOF Reached the end of stream with bytes left to read" while reading/writing to Parquets
[ https://issues.apache.org/jira/browse/SPARK-25966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678542#comment-16678542 ] Cheng Lian edited comment on SPARK-25966 at 11/7/18 5:34 PM: - Hey, [~andrioni], if you still have the original (potentially) corrupted Parquet files at hand, could you please try reading them again with Spark 2.4 but with {{spark.sql.parquet.enableVectorizedReader}} set to {{false}}? In this way, we fall back to the vanilla {{parquet-mr}} 1.10 Parquet reader. If it works fine, it might be an issue in the vectorized reader. Also, any chances that you can share a sample problematic file? Since the same workload worked fine with Spark 2.2.1, I doubt whether this is really a file corruption issue. Unless somehow Spark 2.4 is reading more column(s)/row group(s) than Spark 2.2.1 for the same job, and those extra column(s)/row group(s) happened to contain some corrupted data, which would also indicate an optimizer side issue (predicate push-down and column pruning). was (Author: lian cheng): Hey, [~andrioni], if you still have the original (potentially) corrupted Parquet files at hand, could you please try reading them again with Spark 2.4 but with {{spark.sql.parquet.enableVectorizedReader}} set to {{false}}? In this way, we fall back to the vanilla {{parquet-mr}} 1.10 Parquet reader. If it works fine, it might be an issue in the vectorized reader. Also, any chances that you can share a sample problematic file? Since the same workload worked fine with Spark 2.2.1, I doubt whether this is really a file corruption issue. Unless somehow Spark 2.4 is reading more columns/row groups than Spark 2.2.1 for the same job, which would also indicate an optimizer side issue (predicate push-down and column pruning). > "EOF Reached the end of stream with bytes left to read" while reading/writing > to Parquets > - > > Key: SPARK-25966 > URL: https://issues.apache.org/jira/browse/SPARK-25966 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 (built from RC5 tag) running Hadoop 3.1.1 on > top of a Mesos cluster. Both input and output Parquet files are on S3. >Reporter: Alessandro Andrioni >Priority: Major > > I was persistently getting the following exception while trying to run one > Spark job we have using Spark 2.4.0. It went away after I regenerated from > scratch all the input Parquet files (generated by another Spark job also > using Spark 2.4.0). > Is there a chance that Spark is writing (quite rarely) corrupted Parquet > files? > {code:java} > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) > at
[jira] [Comment Edited] (SPARK-25966) "EOF Reached the end of stream with bytes left to read" while reading/writing to Parquets
[ https://issues.apache.org/jira/browse/SPARK-25966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678542#comment-16678542 ] Cheng Lian edited comment on SPARK-25966 at 11/7/18 5:34 PM: - Hey, [~andrioni], if you still have the original (potentially) corrupted Parquet files at hand, could you please try reading them again with Spark 2.4 but with {{spark.sql.parquet.enableVectorizedReader}} set to {{false}}? In this way, we fall back to the vanilla {{parquet-mr}} 1.10 Parquet reader. If it works fine, it might be an issue in the vectorized reader. Also, any chances that you can share a sample problematic file? Since the same workload worked fine with Spark 2.2.1, I doubt whether this is really a file corruption issue. Unless somehow Spark 2.4 is reading more columns/row groups than Spark 2.2.1 for the same job, and those extra columns/row groups happened to contain some corrupted data, which would also indicate an optimizer side issue (predicate push-down and column pruning). was (Author: lian cheng): Hey, [~andrioni], if you still have the original (potentially) corrupted Parquet files at hand, could you please try reading them again with Spark 2.4 but with {{spark.sql.parquet.enableVectorizedReader}} set to {{false}}? In this way, we fall back to the vanilla {{parquet-mr}} 1.10 Parquet reader. If it works fine, it might be an issue in the vectorized reader. Also, any chances that you can share a sample problematic file? Since the same workload worked fine with Spark 2.2.1, I doubt whether this is really a file corruption issue. Unless somehow Spark 2.4 is reading more column(s)/row group(s) than Spark 2.2.1 for the same job, and those extra column(s)/row group(s) happened to contain some corrupted data, which would also indicate an optimizer side issue (predicate push-down and column pruning). > "EOF Reached the end of stream with bytes left to read" while reading/writing > to Parquets > - > > Key: SPARK-25966 > URL: https://issues.apache.org/jira/browse/SPARK-25966 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 (built from RC5 tag) running Hadoop 3.1.1 on > top of a Mesos cluster. Both input and output Parquet files are on S3. >Reporter: Alessandro Andrioni >Priority: Major > > I was persistently getting the following exception while trying to run one > Spark job we have using Spark 2.4.0. It went away after I regenerated from > scratch all the input Parquet files (generated by another Spark job also > using Spark 2.4.0). > Is there a chance that Spark is writing (quite rarely) corrupted Parquet > files? > {code:java} > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) > at >
[jira] [Commented] (SPARK-25966) "EOF Reached the end of stream with bytes left to read" while reading/writing to Parquets
[ https://issues.apache.org/jira/browse/SPARK-25966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16678542#comment-16678542 ] Cheng Lian commented on SPARK-25966: Hey, [~andrioni], if you still have the original (potentially) corrupted Parquet files at hand, could you please try reading them again with Spark 2.4 but with {{spark.sql.parquet.enableVectorizedReader}} set to {{false}}? In this way, we fall back to the vanilla {{parquet-mr}} 1.10 Parquet reader. If it works fine, it might be an issue in the vectorized reader. Also, any chances that you can share a sample problematic file? Since the same workload worked fine with Spark 2.2.1, I doubt whether this is really a file corruption issue. Unless somehow Spark 2.4 is reading more columns/row groups than Spark 2.2.1 for the same job, which would also indicate an optimizer side issue (predicate push-down and column pruning). > "EOF Reached the end of stream with bytes left to read" while reading/writing > to Parquets > - > > Key: SPARK-25966 > URL: https://issues.apache.org/jira/browse/SPARK-25966 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 (built from RC5 tag) running Hadoop 3.1.1 on > top of a Mesos cluster. Both input and output Parquet files are on S3. >Reporter: Alessandro Andrioni >Priority: Major > > I was persistently getting the following exception while trying to run one > Spark job we have using Spark 2.4.0. It went away after I regenerated from > scratch all the input Parquet files (generated by another Spark job also > using Spark 2.4.0). > Is there a chance that Spark is writing (quite rarely) corrupted Parquet > files? > {code:java} > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) > at > org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228) > at > org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557) > (...) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 312 in stage 682.0 failed 4 times, most recent failure: Lost task 312.3 > in stage 682.0 (TID 235229, 10.130.29.78, executor 77): java.io.EOFException: > Reached the end of stream with 996 bytes left to read > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104) > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127) > at > org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91) >
[jira] [Assigned] (SPARK-24927) The hadoop-provided profile doesn't play well with Snappy-compressed Parquet files
[ https://issues.apache.org/jira/browse/SPARK-24927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-24927: -- Assignee: Cheng Lian > The hadoop-provided profile doesn't play well with Snappy-compressed Parquet > files > -- > > Key: SPARK-24927 > URL: https://issues.apache.org/jira/browse/SPARK-24927 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.3.1, 2.3.2 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Major > > Reproduction: > {noformat} > wget > https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-without-hadoop.tgz > wget > https://archive.apache.org/dist/hadoop/core/hadoop-2.7.3/hadoop-2.7.3.tar.gz > tar xzf spark-2.3.1-bin-without-hadoop.tgz > tar xzf hadoop-2.7.3.tar.gz > export SPARK_DIST_CLASSPATH=$(hadoop-2.7.3/bin/hadoop classpath) > ./spark-2.3.1-bin-without-hadoop/bin/spark-shell --master local > ... > scala> > spark.range(1).repartition(1).write.mode("overwrite").parquet("file:///tmp/test.parquet") > {noformat} > Exception: > {noformat} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) > ... 69 more > Caused by: org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.UnsatisfiedLinkError: > org.xerial.snappy.SnappyNative.maxCompressedLength(I)I > at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) > at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) > at > org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) > at > org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) > at > org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) > at > org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:167) > at
[jira] [Updated] (SPARK-24927) The hadoop-provided profile doesn't play well with Snappy-compressed Parquet files
[ https://issues.apache.org/jira/browse/SPARK-24927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-24927: --- Description: Reproduction: {noformat} wget https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-without-hadoop.tgz wget https://archive.apache.org/dist/hadoop/core/hadoop-2.7.3/hadoop-2.7.3.tar.gz tar xzf spark-2.3.1-bin-without-hadoop.tgz tar xzf hadoop-2.7.3.tar.gz export SPARK_DIST_CLASSPATH=$(hadoop-2.7.3/bin/hadoop classpath) ./spark-2.3.1-bin-without-hadoop/bin/spark-shell --master local ... scala> spark.range(1).repartition(1).write.mode("overwrite").parquet("file:///tmp/test.parquet") {noformat} Exception: {noformat} Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 69 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) at org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:167) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:163) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396) at
[jira] [Commented] (SPARK-24927) The hadoop-provided profile doesn't play well with Snappy-compressed Parquet files
[ https://issues.apache.org/jira/browse/SPARK-24927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16557603#comment-16557603 ] Cheng Lian commented on SPARK-24927: Downgraded from blocker to major, since it's not a regression. Just realized that this issue existed ever since at least 1.6. > The hadoop-provided profile doesn't play well with Snappy-compressed Parquet > files > -- > > Key: SPARK-24927 > URL: https://issues.apache.org/jira/browse/SPARK-24927 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.3.1, 2.3.2 >Reporter: Cheng Lian >Priority: Major > > Reproduction: > {noformat} > wget > https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-without-hadoop.tgz > wget > https://archive.apache.org/dist/hadoop/core/hadoop-2.7.3/hadoop-2.7.3.tar.gz > tar xzf spark-2.3.1-bin-without-hadoop.tgz > tar xzf hadoop-2.7.3.tar.gz > export SPARK_DIST_CLASSPATH=$(hadoop-2.7.3/bin/hadoop classpath) > ./spark-2.3.1-bin-without-hadoop/bin/spark-shell --master local > ... > scala> > spark.range(1).repartition(1).write.mode("overwrite").parquet("file:///tmp/test.parquet") > {noformat} > Exception: > {noformat} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) > ... 69 more > Caused by: org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.UnsatisfiedLinkError: > org.xerial.snappy.SnappyNative.maxCompressedLength(I)I > at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) > at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) > at > org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) > at > org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) > at > org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) > at > org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) > at >
[jira] [Updated] (SPARK-24927) The hadoop-provided profile doesn't play well with Snappy-compressed Parquet files
[ https://issues.apache.org/jira/browse/SPARK-24927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-24927: --- Priority: Major (was: Blocker) > The hadoop-provided profile doesn't play well with Snappy-compressed Parquet > files > -- > > Key: SPARK-24927 > URL: https://issues.apache.org/jira/browse/SPARK-24927 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.3.1, 2.3.2 >Reporter: Cheng Lian >Priority: Major > > Reproduction: > {noformat} > wget > https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-without-hadoop.tgz > wget > https://archive.apache.org/dist/hadoop/core/hadoop-2.7.3/hadoop-2.7.3.tar.gz > tar xzf spark-2.3.1-bin-without-hadoop.tgz > tar xzf hadoop-2.7.3.tar.gz > export SPARK_DIST_CLASSPATH=$(hadoop-2.7.3/bin/hadoop classpath) > ./spark-2.3.1-bin-without-hadoop/bin/spark-shell --master local > ... > scala> > spark.range(1).repartition(1).write.mode("overwrite").parquet("file:///tmp/test.parquet") > {noformat} > Exception: > {noformat} > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) > ... 69 more > Caused by: org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.UnsatisfiedLinkError: > org.xerial.snappy.SnappyNative.maxCompressedLength(I)I > at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) > at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) > at > org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) > at > org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) > at > org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) > at > org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:167) > at >
[jira] [Created] (SPARK-24927) The hadoop-provided profile doesn't play well with Snappy-compressed Parquet files
Cheng Lian created SPARK-24927: -- Summary: The hadoop-provided profile doesn't play well with Snappy-compressed Parquet files Key: SPARK-24927 URL: https://issues.apache.org/jira/browse/SPARK-24927 Project: Spark Issue Type: Bug Components: Build Affects Versions: 2.3.1, 2.3.2 Reporter: Cheng Lian Reproduction: {noformat} wget https://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-without-hadoop.tgz wget https://archive.apache.org/dist/hadoop/core/hadoop-2.7.3/hadoop-2.7.3.tar.gz tar xzf spark-2.3.1-bin-without-hadoop.tgz tar xzf hadoop-2.7.3.tar.gz export SPARK_DIST_CLASSPATH=$(hadoop-2.7.3/bin/hadoop classpath) ./spark-2.3.1-bin-without-hadoop/bin/spark-shell --master local ... scala> spark.range(1).repartition(1).write.mode("overwrite").parquet("file:///tmp/test.parquet") {noformat} Exception: {noformat} Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 69 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method) at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316) at org.apache.parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:67) at org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) at org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) at org.apache.parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:112) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:93) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:150) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:238) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:121) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:167) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:109) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:163) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405) at
[jira] [Assigned] (SPARK-24895) Spark 2.4.0 Snapshot artifacts has broken metadata due to mismatched filenames
[ https://issues.apache.org/jira/browse/SPARK-24895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-24895: -- Assignee: Eric Chang > Spark 2.4.0 Snapshot artifacts has broken metadata due to mismatched filenames > -- > > Key: SPARK-24895 > URL: https://issues.apache.org/jira/browse/SPARK-24895 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.4.0 >Reporter: Eric Chang >Assignee: Eric Chang >Priority: Major > > Spark 2.4.0 has Maven build errors because artifacts uploaded to apache maven > repo has mismatched filenames: > {noformat} > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce > (enforce-banned-dependencies) on project spark_2.4: Execution > enforce-banned-dependencies of goal > org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: > org.apache.maven.shared.dependency.graph.DependencyGraphBuilderException: > Could not resolve following dependencies: > [org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-SNAPSHOT (compile), > org.apache.spark:spark-network-shuffle_2.11:jar:2.4.0-SNAPSHOT (compile), > org.apache.spark:spark-sketch_2.11:jar:2.4.0-SNAPSHOT (compile)]: Could not > resolve dependencies for project com.databricks:spark_2.4:pom:1: The > following artifacts could not be resolved: > org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-SNAPSHOT, > org.apache.spark:spark-network-shuffle_2.11:jar:2.4.0-SNAPSHOT, > org.apache.spark:spark-sketch_2.11:jar:2.4.0-SNAPSHOT: Could not find > artifact > org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-20180723.232411-177 in > apache-snapshots ([https://repository.apache.org/snapshots/]) -> [Help 1] > {noformat} > > If you check the artifact metadata you will see the pom and jar files are > 2.4.0-20180723.232411-177 instead of 2.4.0-20180723.232410-177: > {code:xml} > > org.apache.spark > spark-mllib-local_2.11 > 2.4.0-SNAPSHOT > > > 20180723.232411 > 177 > > 20180723232411 > > > jar > 2.4.0-20180723.232411-177 > 20180723232411 > > > pom > 2.4.0-20180723.232411-177 > 20180723232411 > > > tests > jar > 2.4.0-20180723.232410-177 > 20180723232411 > > > sources > jar > 2.4.0-20180723.232410-177 > 20180723232411 > > > test-sources > jar > 2.4.0-20180723.232410-177 > 20180723232411 > > > > > {code} > > This behavior is very similar to this issue: > https://issues.apache.org/jira/browse/MDEPLOY-221 > Since 2.3.0 snapshots work with the same maven 3.3.9 version and maven deploy > 2.8.2 plugin, it is highly possible that we introduced a new plugin that > causes this. > The most recent addition is the spot-bugs plugin, which is known to have > incompatibilities with other plugins: > [https://github.com/spotbugs/spotbugs-maven-plugin/issues/21] > We may want to try building without it to sanity check. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24895) Spark 2.4.0 Snapshot artifacts has broken metadata due to mismatched filenames
[ https://issues.apache.org/jira/browse/SPARK-24895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-24895: --- Description: Spark 2.4.0 has Maven build errors because artifacts uploaded to apache maven repo has mismatched filenames: {noformat} [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-banned-dependencies) on project spark_2.4: Execution enforce-banned-dependencies of goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: org.apache.maven.shared.dependency.graph.DependencyGraphBuilderException: Could not resolve following dependencies: [org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-SNAPSHOT (compile), org.apache.spark:spark-network-shuffle_2.11:jar:2.4.0-SNAPSHOT (compile), org.apache.spark:spark-sketch_2.11:jar:2.4.0-SNAPSHOT (compile)]: Could not resolve dependencies for project com.databricks:spark_2.4:pom:1: The following artifacts could not be resolved: org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-SNAPSHOT, org.apache.spark:spark-network-shuffle_2.11:jar:2.4.0-SNAPSHOT, org.apache.spark:spark-sketch_2.11:jar:2.4.0-SNAPSHOT: Could not find artifact org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-20180723.232411-177 in apache-snapshots ([https://repository.apache.org/snapshots/]) -> [Help 1] {noformat} If you check the artifact metadata you will see the pom and jar files are 2.4.0-20180723.232411-177 instead of 2.4.0-20180723.232410-177: {code:xml} org.apache.spark spark-mllib-local_2.11 2.4.0-SNAPSHOT 20180723.232411 177 20180723232411 jar 2.4.0-20180723.232411-177 20180723232411 pom 2.4.0-20180723.232411-177 20180723232411 tests jar 2.4.0-20180723.232410-177 20180723232411 sources jar 2.4.0-20180723.232410-177 20180723232411 test-sources jar 2.4.0-20180723.232410-177 20180723232411 {code} This behavior is very similar to this issue: https://issues.apache.org/jira/browse/MDEPLOY-221 Since 2.3.0 snapshots work with the same maven 3.3.9 version and maven deploy 2.8.2 plugin, it is highly possible that we introduced a new plugin that causes this. The most recent addition is the spot-bugs plugin, which is known to have incompatibilities with other plugins: [https://github.com/spotbugs/spotbugs-maven-plugin/issues/21] We may want to try building without it to sanity check. was: Spark 2.4.0 has maven build errors because artifacts uploaded to apache maven repo has mismatched filenames: {code} [ERROR] Failed to execute goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce (enforce-banned-dependencies) on project spark_2.4: Execution enforce-banned-dependencies of goal org.apache.maven.plugins:maven-enforcer-plugin:1.4.1:enforce failed: org.apache.maven.shared.dependency.graph.DependencyGraphBuilderException: Could not resolve following dependencies: [org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-SNAPSHOT (compile), org.apache.spark:spark-network-shuffle_2.11:jar:2.4.0-SNAPSHOT (compile), org.apache.spark:spark-sketch_2.11:jar:2.4.0-SNAPSHOT (compile)]: Could not resolve dependencies for project com.databricks:spark_2.4:pom:1: The following artifacts could not be resolved: org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-SNAPSHOT, org.apache.spark:spark-network-shuffle_2.11:jar:2.4.0-SNAPSHOT, org.apache.spark:spark-sketch_2.11:jar:2.4.0-SNAPSHOT: Could not find artifact org.apache.spark:spark-mllib-local_2.11:jar:2.4.0-20180723.232411-177 in apache-snapshots ([https://repository.apache.org/snapshots/]) -> [Help 1] {code} If you check the artifact metadata you will see the pom and jar files are 2.4.0-20180723.232411-177 instead of 2.4.0-20180723.232410-177: {code:xml} org.apache.spark spark-mllib-local_2.11 2.4.0-SNAPSHOT 20180723.232411 177 20180723232411 jar 2.4.0-20180723.232411-177 20180723232411 pom 2.4.0-20180723.232411-177 20180723232411 tests jar 2.4.0-20180723.232410-177 20180723232411 sources jar 2.4.0-20180723.232410-177 20180723232411 test-sources jar 2.4.0-20180723.232410-177 20180723232411 {code} This behavior is very similar to this issue: https://issues.apache.org/jira/browse/MDEPLOY-221 Since 2.3.0 snapshots work with the same maven 3.3.9 version and maven deploy 2.8.2 plugin, it is highly possible that we introduced a new plugin that causes this. The most recent addition is the spot-bugs plugin, which is known to have incompatibilities with other plugins: [https://github.com/spotbugs/spotbugs-maven-plugin/issues/21] We may want to try building without it to sanity check. > Spark 2.4.0 Snapshot artifacts has broken metadata due to mismatched filenames >
[jira] [Commented] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372289#comment-16372289 ] Cheng Lian commented on SPARK-19737: [~LANDAIS Christophe], I filed SPARK-23486 for this. Should be relatively straightforward to fix and I'd like to have a new contributor to try it as a starter task. Thanks for reporting! > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Major > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an undefined > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, it may take the analyzer a long time before > realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocations > # Look up the function names from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23486) LookupFunctions should not check the same function name more than once
[ https://issues.apache.org/jira/browse/SPARK-23486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-23486: --- Labels: starter (was: ) > LookupFunctions should not check the same function name more than once > -- > > Key: SPARK-23486 > URL: https://issues.apache.org/jira/browse/SPARK-23486 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Cheng Lian >Priority: Major > Labels: starter > > For a query invoking the same function multiple times, the current > {{LookupFunctions}} rule performs a check for each invocation. For users > using Hive metastore as external catalog, this issues unnecessary metastore > accesses and can slow down the analysis phase quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23486) LookupFunctions should not check the same function name more than once
[ https://issues.apache.org/jira/browse/SPARK-23486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372285#comment-16372285 ] Cheng Lian commented on SPARK-23486: Please refer to [this comment|https://issues.apache.org/jira/browse/SPARK-19737?focusedCommentId=16371377=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16371377] for more details. > LookupFunctions should not check the same function name more than once > -- > > Key: SPARK-23486 > URL: https://issues.apache.org/jira/browse/SPARK-23486 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.0 >Reporter: Cheng Lian >Priority: Major > > For a query invoking the same function multiple times, the current > {{LookupFunctions}} rule performs a check for each invocation. For users > using Hive metastore as external catalog, this issues unnecessary metastore > accesses and can slow down the analysis phase quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23486) LookupFunctions should not check the same function name more than once
Cheng Lian created SPARK-23486: -- Summary: LookupFunctions should not check the same function name more than once Key: SPARK-23486 URL: https://issues.apache.org/jira/browse/SPARK-23486 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.1, 2.3.0 Reporter: Cheng Lian For a query invoking the same function multiple times, the current {{LookupFunctions}} rule performs a check for each invocation. For users using Hive metastore as external catalog, this issues unnecessary metastore accesses and can slow down the analysis phase quite a bit. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22951) count() after dropDuplicates() on emptyDataFrame returns incorrect value
[ https://issues.apache.org/jira/browse/SPARK-22951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved SPARK-22951. Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 20174 [https://github.com/apache/spark/pull/20174] > count() after dropDuplicates() on emptyDataFrame returns incorrect value > > > Key: SPARK-22951 > URL: https://issues.apache.org/jira/browse/SPARK-22951 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.2.0, 2.3.0 >Reporter: Michael Dreibelbis >Assignee: Feng Liu > Labels: correctness > Fix For: 2.3.0 > > > here is a minimal Spark Application to reproduce: > {code} > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > object DropDupesApp extends App { > > override def main(args: Array[String]): Unit = { > val conf = new SparkConf() > .setAppName("test") > .setMaster("local") > val sc = new SparkContext(conf) > val sql = SQLContext.getOrCreate(sc) > assert(sql.emptyDataFrame.count == 0) // expected > assert(sql.emptyDataFrame.dropDuplicates.count == 1) // unexpected > } > > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-22951) count() after dropDuplicates() on emptyDataFrame returns incorrect value
[ https://issues.apache.org/jira/browse/SPARK-22951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-22951: -- Assignee: Feng Liu > count() after dropDuplicates() on emptyDataFrame returns incorrect value > > > Key: SPARK-22951 > URL: https://issues.apache.org/jira/browse/SPARK-22951 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.2.0, 2.3.0 >Reporter: Michael Dreibelbis >Assignee: Feng Liu > Labels: correctness > > here is a minimal Spark Application to reproduce: > {code} > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > object DropDupesApp extends App { > > override def main(args: Array[String]): Unit = { > val conf = new SparkConf() > .setAppName("test") > .setMaster("local") > val sc = new SparkContext(conf) > val sql = SQLContext.getOrCreate(sc) > assert(sql.emptyDataFrame.count == 0) // expected > assert(sql.emptyDataFrame.dropDuplicates.count == 1) // unexpected > } > > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22951) count() after dropDuplicates() on emptyDataFrame returns incorrect value
[ https://issues.apache.org/jira/browse/SPARK-22951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-22951: --- Target Version/s: 2.3.0 > count() after dropDuplicates() on emptyDataFrame returns incorrect value > > > Key: SPARK-22951 > URL: https://issues.apache.org/jira/browse/SPARK-22951 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.2.0, 2.3.0 >Reporter: Michael Dreibelbis > Labels: correctness > > here is a minimal Spark Application to reproduce: > {code} > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > object DropDupesApp extends App { > > override def main(args: Array[String]): Unit = { > val conf = new SparkConf() > .setAppName("test") > .setMaster("local") > val sc = new SparkContext(conf) > val sql = SQLContext.getOrCreate(sc) > assert(sql.emptyDataFrame.count == 0) // expected > assert(sql.emptyDataFrame.dropDuplicates.count == 1) // unexpected > } > > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22951) count() after dropDuplicates() on emptyDataFrame returns incorrect value
[ https://issues.apache.org/jira/browse/SPARK-22951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-22951: --- Labels: correctness (was: ) > count() after dropDuplicates() on emptyDataFrame returns incorrect value > > > Key: SPARK-22951 > URL: https://issues.apache.org/jira/browse/SPARK-22951 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2, 2.2.0, 2.3.0 >Reporter: Michael Dreibelbis > Labels: correctness > > here is a minimal Spark Application to reproduce: > {code} > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkConf, SparkContext} > object DropDupesApp extends App { > > override def main(args: Array[String]): Unit = { > val conf = new SparkConf() > .setAppName("test") > .setMaster("local") > val sc = new SparkContext(conf) > val sql = SQLContext.getOrCreate(sc) > assert(sql.emptyDataFrame.count == 0) // expected > assert(sql.emptyDataFrame.dropDuplicates.count == 1) // unexpected > } > > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (HADOOP-15086) NativeAzureFileSystem.rename is not atomic
[ https://issues.apache.org/jira/browse/HADOOP-15086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16275422#comment-16275422 ] Cheng Lian commented on HADOOP-15086: - To be more specific, when multiple threads rename files to the same target path, more than 1 *but not all* threads can succeed. It's because check and copy file in {{NativeAzureFileSystem#rename()}} is not atomic. The problem here is that it's unclear what the expected semantics of {{NativeAzureFileSystem#rename()}} is: - If the semantics is "error if the destination file already exists", then only 1 thread can succeed. - If the semantics is "overwrite if the destination file already exists", then all threads should succeed. > NativeAzureFileSystem.rename is not atomic > -- > > Key: HADOOP-15086 > URL: https://issues.apache.org/jira/browse/HADOOP-15086 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure >Affects Versions: 2.7.3 >Reporter: Shixiong Zhu > Attachments: RenameReproducer.java > > > When multiple threads rename files to the same target path, more than 1 > threads can succeed. It's because check and copy file in `rename` is not > atomic. > I would expect it's atomic just like HDFS. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org
[jira] [Assigned] (PARQUET-1102) Travis CI builds are failing for parquet-format PRs
[ https://issues.apache.org/jira/browse/PARQUET-1102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned PARQUET-1102: --- Assignee: Cheng Lian > Travis CI builds are failing for parquet-format PRs > --- > > Key: PARQUET-1102 > URL: https://issues.apache.org/jira/browse/PARQUET-1102 > Project: Parquet > Issue Type: Bug > Components: parquet-format >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Blocker > Fix For: format-2.3.2 > > > Travis CI builds are failing for parquet-format PRs, probably due to the > migration from Ubuntu precise to trusty on Sep 1 according to [this Travis > official blog > post|https://blog.travis-ci.com/2017-08-31-trusty-as-default-status]. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (PARQUET-1091) Wrong and broken links in README
[ https://issues.apache.org/jira/browse/PARQUET-1091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved PARQUET-1091. - Resolution: Fixed Fix Version/s: format-2.3.2 Issue resolved by pull request 65 [https://github.com/apache/parquet-format/pull/65] > Wrong and broken links in README > > > Key: PARQUET-1091 > URL: https://issues.apache.org/jira/browse/PARQUET-1091 > Project: Parquet > Issue Type: Bug > Components: parquet-format >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Minor > Fix For: format-2.3.2 > > > Multiple links in README.md still point to the old {{Parquet/parquet-format}} > repository, which is now removed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (PARQUET-1102) Travis CI builds are failing for parquet-format PRs
[ https://issues.apache.org/jira/browse/PARQUET-1102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved PARQUET-1102. - Resolution: Fixed Fix Version/s: format-2.3.2 Issue resolved by pull request 66 [https://github.com/apache/parquet-format/pull/66] > Travis CI builds are failing for parquet-format PRs > --- > > Key: PARQUET-1102 > URL: https://issues.apache.org/jira/browse/PARQUET-1102 > Project: Parquet > Issue Type: Bug > Components: parquet-format >Reporter: Cheng Lian >Priority: Blocker > Fix For: format-2.3.2 > > > Travis CI builds are failing for parquet-format PRs, probably due to the > migration from Ubuntu precise to trusty on Sep 1 according to [this Travis > official blog > post|https://blog.travis-ci.com/2017-08-31-trusty-as-default-status]. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (PARQUET-1102) Travis CI builds are failing for parquet-format PRs
[ https://issues.apache.org/jira/browse/PARQUET-1102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated PARQUET-1102: Priority: Blocker (was: Major) > Travis CI builds are failing for parquet-format PRs > --- > > Key: PARQUET-1102 > URL: https://issues.apache.org/jira/browse/PARQUET-1102 > Project: Parquet > Issue Type: Bug > Components: parquet-format >Reporter: Cheng Lian >Priority: Blocker > > Travis CI builds are failing for parquet-format PRs, probably due to the > migration from Ubuntu precise to trusty on Sep 1 according to [this Travis > official blog > post|https://blog.travis-ci.com/2017-08-31-trusty-as-default-status]. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (PARQUET-1102) Travis CI builds are failing for parquet-format PRs
Cheng Lian created PARQUET-1102: --- Summary: Travis CI builds are failing for parquet-format PRs Key: PARQUET-1102 URL: https://issues.apache.org/jira/browse/PARQUET-1102 Project: Parquet Issue Type: Bug Components: parquet-format Reporter: Cheng Lian Travis CI builds are failing for parquet-format PRs, probably due to the migration from Ubuntu precise to trusty on Sep 1 according to [this Travis official blog post|https://blog.travis-ci.com/2017-08-31-trusty-as-default-status]. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (PARQUET-1091) Wrong and broken links in README
Cheng Lian created PARQUET-1091: --- Summary: Wrong and broken links in README Key: PARQUET-1091 URL: https://issues.apache.org/jira/browse/PARQUET-1091 Project: Parquet Issue Type: Bug Components: parquet-format Reporter: Cheng Lian Assignee: Cheng Lian Priority: Minor Multiple links in README.md still point to the old {{Parquet/parquet-format}} repository, which is now removed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
[ https://issues.apache.org/jira/browse/HADOOP-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated HADOOP-14700: Description: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system instance {{fs}} is associated with a container {{A}}, when trying to access a blob inside another container {{B}}, {{fs}} still tries to find the blob inside container {{A}}. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because {{fs}} reads the contents from the blob inside container {{A}} instead of container {{B}}. You may reproduce it by running the following self-contained Scala script using [Ammonite|http://ammonite.io/]: {code} #!/usr/bin/env amm --no-remote-logging import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_B@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents = readerB.readLine() println(s"Should not reach here but we got $contents") } } {code} was: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system
[jira] [Updated] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
[ https://issues.apache.org/jira/browse/HADOOP-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated HADOOP-14700: Description: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system instance {{fs}} is associated with a container {{A}}, when trying to access a blob inside another container {{B}}, {{fs}} still tries to find the blob inside container {{A}}. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because {{fs}} reads the contents from the blob inside container {{A}} instead of container {{B}}. The following self-contained Scala code snippet illustrates this issue. You may reproduce it by running the following Scala script using [Ammonite|http://ammonite.io/]. {code} #!/usr/bin/env amm import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_B@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents = readerB.readLine() println(s"Should not reach here but we got $contents") } } {code} was: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the
[jira] [Commented] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
[ https://issues.apache.org/jira/browse/HADOOP-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111645#comment-16111645 ] Cheng Lian commented on HADOOP-14700: - Oops... Thanks for pointing out the typo, [~ste...@apache.org]! This issue still remains after fixing the path, though. > NativeAzureFileSystem.open() ignores blob container name > > > Key: HADOOP-14700 > URL: https://issues.apache.org/jira/browse/HADOOP-14700 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs >Affects Versions: 3.0.0-beta1, 3.0.0-alpha4 >Reporter: Cheng Lian > > {{NativeAzureFileSystem}} instances are associated with the blob container > used to initialize the file system. Assuming that a file system instance > {{fs}} is associated with a container {{A}}, when trying to access a blob > inside another container {{B}}, {{fs}} still tries to find the blob inside > container {{A}}. If there happens to be two blobs with the same name inside > both containers, the user may get a wrong result because {{fs}} reads the > contents from the blob inside container {{A}} instead of container {{B}}. > The following self-contained Scala code snippet illustrates this issue. You > may reproduce it by running the script inside the [Ammonite > REPL|http://ammonite.io/]. > {code} > #!/usr/bin/env amm > import $ivy.`com.jsuereth::scala-arm:2.0` > import $ivy.`com.microsoft.azure:azure-storage:5.2.0` > import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` > import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` > import $ivy.`org.scalatest::scalatest:3.0.3` > import java.io.{BufferedReader, InputStreamReader} > import java.net.URI > import java.time.{Duration, Instant} > import java.util.{Date, EnumSet} > import com.microsoft.azure.storage.{CloudStorageAccount, > StorageCredentialsAccountAndKey} > import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, > SharedAccessBlobPolicy} > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} > import org.scalatest.Assertions._ > import resource._ > // Utility implicit conversion for auto resource management. > implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new > Resource[T] { > override def close(closable: T): Unit = closable.close() > } > // Credentials information > val ACCOUNT = "** REDACTED **" > val ACCESS_KEY = "** REDACTED **" > // We'll create two different containers, both contain a blob named > "test-blob" but with different > // contents. > val CONTAINER_A = "container-a" > val CONTAINER_B = "container-b" > val TEST_BLOB = "test-blob" > val blobClient = { > val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) > val account = new CloudStorageAccount(credentials, /* useHttps */ true) > account.createCloudBlobClient() > } > // Generates a read-only SAS key restricted within "container-a". > val sasKeyForContainerA = { > val since = Instant.now() minus Duration.ofMinutes(10) > val duration = Duration.ofHours(1) > val policy = new SharedAccessBlobPolicy() > policy.setSharedAccessStartTime(Date.from(since)) > policy.setSharedAccessExpiryTime(Date.from(since plus duration)) > policy.setPermissions(EnumSet.of( > SharedAccessBlobPermissions.READ, > SharedAccessBlobPermissions.LIST > )) > blobClient > .getContainerReference(CONTAINER_A) > .generateSharedAccessSignature(policy, null) > } > // Sets up testing containers and blobs using the Azure storage SDK: > // > // container-a/test-blob => "foo" > // container-b/test-blob => "bar" > { > val containerARef = blobClient.getContainerReference(CONTAINER_A) > val containerBRef = blobClient.getContainerReference(CONTAINER_B) > containerARef.createIfNotExists() > containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") > containerBRef.createIfNotExists() > containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") > } > val pathA = new > Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") > val pathB = new > Path(s"wasbs://$CONTAINER_B@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") > for { > // Creates a file system associated with "container-a". > fs <- managed { > val conf = new Configuration > conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) > conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", > sasKeyForContainerA) > pathA.getFileSystem(conf) > } > // Opens a reader pointing to "container-a/test-blob". We expect to get the > string "foo" written > // to this blob previously. > readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) > // Opens a reader pointing to
[jira] [Updated] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
[ https://issues.apache.org/jira/browse/HADOOP-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated HADOOP-14700: Description: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system instance {{fs}} is associated with a container {{A}}, when trying to access a blob inside another container {{B}}, {{fs}} still tries to find the blob inside container {{A}}. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because {{fs}} reads the contents from the blob inside container {{A}} instead of container {{B}}. The following self-contained Scala code snippet illustrates this issue. You may reproduce it by running the script inside the [Ammonite REPL|http://ammonite.io/]. {code} #!/usr/bin/env amm import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_B@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents = readerB.readLine() println(s"Should not reach here but we got $contents") } } {code} was: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file
[jira] [Updated] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
[ https://issues.apache.org/jira/browse/HADOOP-14700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated HADOOP-14700: Description: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system instance {{fs}} is associated with a container {{A}}, when trying to access a blob inside another container {{B}}, {{fs}} still tries to find the blob inside container {{A}}. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because {{fs}} reads the contents from the blob inside container {{A}} instead of container {{B}}. The following self-contained Scala code snippet illustrates this issue. You may reproduce it by running the script inside the [Ammonite REPL|http://ammonite.io/]. {code} import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents = readerB.readLine() println(s"Should not reach here but we got $contents") } } {code} was: {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a
[jira] [Created] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
Cheng Lian created HADOOP-14700: --- Summary: NativeAzureFileSystem.open() ignores blob container name Key: HADOOP-14700 URL: https://issues.apache.org/jira/browse/HADOOP-14700 Project: Hadoop Common Issue Type: Sub-task Components: fs Affects Versions: 3.0.0-alpha4, 3.0.0-beta1 Reporter: Cheng Lian {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system instance {{fs}} is associated with a container {{A}}, when trying to access a blob inside another container {{B}}, {{fs}} still tries to find the blob inside container {{A}}. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because {{fs}} reads the contents from the blob inside container {{A}} instead of container {{B}}. The following self-contained Scala code snippet illustrates this issue. You may reproduce it by running the script inside the [Ammonite REPL|http://ammonite.io/]. {code} import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents =
[jira] [Created] (HADOOP-14700) NativeAzureFileSystem.open() ignores blob container name
Cheng Lian created HADOOP-14700: --- Summary: NativeAzureFileSystem.open() ignores blob container name Key: HADOOP-14700 URL: https://issues.apache.org/jira/browse/HADOOP-14700 Project: Hadoop Common Issue Type: Sub-task Components: fs Affects Versions: 3.0.0-alpha4, 3.0.0-beta1 Reporter: Cheng Lian {{NativeAzureFileSystem}} instances are associated with the blob container used to initialize the file system. Assuming that a file system instance {{fs}} is associated with a container {{A}}, when trying to access a blob inside another container {{B}}, {{fs}} still tries to find the blob inside container {{A}}. If there happens to be two blobs with the same name inside both containers, the user may get a wrong result because {{fs}} reads the contents from the blob inside container {{A}} instead of container {{B}}. The following self-contained Scala code snippet illustrates this issue. You may reproduce it by running the script inside the [Ammonite REPL|http://ammonite.io/]. {code} import $ivy.`com.jsuereth::scala-arm:2.0` import $ivy.`com.microsoft.azure:azure-storage:5.2.0` import $ivy.`org.apache.hadoop:hadoop-azure:3.0.0-alpha4` import $ivy.`org.apache.hadoop:hadoop-common:3.0.0-alpha4` import $ivy.`org.scalatest::scalatest:3.0.3` import java.io.{BufferedReader, InputStreamReader} import java.net.URI import java.time.{Duration, Instant} import java.util.{Date, EnumSet} import com.microsoft.azure.storage.{CloudStorageAccount, StorageCredentialsAccountAndKey} import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.azure.{AzureException, NativeAzureFileSystem} import org.scalatest.Assertions._ import resource._ // Utility implicit conversion for auto resource management. implicit def `Closable->Resource`[T <: { def close() }]: Resource[T] = new Resource[T] { override def close(closable: T): Unit = closable.close() } // Credentials information val ACCOUNT = "** REDACTED **" val ACCESS_KEY = "** REDACTED **" // We'll create two different containers, both contain a blob named "test-blob" but with different // contents. val CONTAINER_A = "container-a" val CONTAINER_B = "container-b" val TEST_BLOB = "test-blob" val blobClient = { val credentials = new StorageCredentialsAccountAndKey(ACCOUNT, ACCESS_KEY) val account = new CloudStorageAccount(credentials, /* useHttps */ true) account.createCloudBlobClient() } // Generates a read-only SAS key restricted within "container-a". val sasKeyForContainerA = { val since = Instant.now() minus Duration.ofMinutes(10) val duration = Duration.ofHours(1) val policy = new SharedAccessBlobPolicy() policy.setSharedAccessStartTime(Date.from(since)) policy.setSharedAccessExpiryTime(Date.from(since plus duration)) policy.setPermissions(EnumSet.of( SharedAccessBlobPermissions.READ, SharedAccessBlobPermissions.LIST )) blobClient .getContainerReference(CONTAINER_A) .generateSharedAccessSignature(policy, null) } // Sets up testing containers and blobs using the Azure storage SDK: // // container-a/test-blob => "foo" // container-b/test-blob => "bar" { val containerARef = blobClient.getContainerReference(CONTAINER_A) val containerBRef = blobClient.getContainerReference(CONTAINER_B) containerARef.createIfNotExists() containerARef.getBlockBlobReference(TEST_BLOB).uploadText("foo") containerBRef.createIfNotExists() containerBRef.getBlockBlobReference(TEST_BLOB).uploadText("bar") } val pathA = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") val pathB = new Path(s"wasbs://$CONTAINER_A@$ACCOUNT.blob.core.windows.net/$TEST_BLOB") for { // Creates a file system associated with "container-a". fs <- managed { val conf = new Configuration conf.set("fs.wasbs.impl", classOf[NativeAzureFileSystem].getName) conf.set(s"fs.azure.sas.$CONTAINER_A.$ACCOUNT.blob.core.windows.net", sasKeyForContainerA) pathA.getFileSystem(conf) } // Opens a reader pointing to "container-a/test-blob". We expect to get the string "foo" written // to this blob previously. readerA <- managed(new BufferedReader(new InputStreamReader(fs open pathA))) // Opens a reader pointing to "container-b/test-blob". We expect to get an exception since the SAS // key used to create the `FileSystem` instance is restricted to "container-a". readerB <- managed(new BufferedReader(new InputStreamReader(fs open pathB))) } { // Should get "foo" assert(readerA.readLine() == "foo") // Should catch an exception ... intercept[AzureException] { // ... but instead, we get string "foo" here, which indicates that the readerB was reading from // "container-a" instead of "container-b". val contents =
[jira] [Assigned] (SPARK-9686) Spark Thrift server doesn't return correct JDBC metadata
[ https://issues.apache.org/jira/browse/SPARK-9686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-9686: - Assignee: (was: Cheng Lian) > Spark Thrift server doesn't return correct JDBC metadata > - > > Key: SPARK-9686 > URL: https://issues.apache.org/jira/browse/SPARK-9686 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0, 1.4.1, 1.5.0, 1.5.1, 1.5.2 >Reporter: pin_zhang >Priority: Critical > Attachments: SPARK-9686.1.patch.txt > > > 1. Start start-thriftserver.sh > 2. connect with beeline > 3. create table > 4.show tables, the new created table returned > 5. > Class.forName("org.apache.hive.jdbc.HiveDriver"); > String URL = "jdbc:hive2://localhost:1/default"; >Properties info = new Properties(); > Connection conn = DriverManager.getConnection(URL, info); > ResultSet tables = conn.getMetaData().getTables(conn.getCatalog(), >null, null, null); > Problem: >No tables with returned this API, that work in spark1.3 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1
[ https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043478#comment-16043478 ] Cheng Lian commented on SPARK-20958: [~marmbrus], here is the draft release note entry: {quote} SPARK-20958: For users who use parquet-avro together with Spark 2.2, please use parquet-avro 1.8.1 instead of parquet-avro 1.8.2. This is because parquet-avro 1.8.2 upgrades avro from 1.7.6 to 1.8.1, which is backward incompatible with 1.7.6. {quote} > Roll back parquet-mr 1.8.2 to parquet-1.8.1 > --- > > Key: SPARK-20958 > URL: https://issues.apache.org/jira/browse/SPARK-20958 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > Labels: release-notes, release_notes, releasenotes > > We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on > avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 > and avro 1.7.7 used by spark-core 2.2.0-rc2. > Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro > (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the > reasons mentioned in [PR > #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. > Therefore, we don't really have many choices here and have to roll back > parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1
[ https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-20958: --- Labels: release-notes release_notes releasenotes (was: release-notes) > Roll back parquet-mr 1.8.2 to parquet-1.8.1 > --- > > Key: SPARK-20958 > URL: https://issues.apache.org/jira/browse/SPARK-20958 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > Labels: release-notes, release_notes, releasenotes > > We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on > avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 > and avro 1.7.7 used by spark-core 2.2.0-rc2. > Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro > (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the > reasons mentioned in [PR > #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. > Therefore, we don't really have many choices here and have to roll back > parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1
[ https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035149#comment-16035149 ] Cheng Lian commented on SPARK-20958: Thanks [~rdblue]! I'm also reluctant to roll it back considering those fixes we wanted so badly... We decided to give this a try because, from the perspective of release management, we'd like to avoid cutting a release with known conflicting dependencies, even transitive ones. For a Spark 2.2 user, it's quite natural to choose parquet-avro 1.8.2, which is part of parquet-mr 1.8.2, which in turn, is a direct dependency of Spark 2.2.0. However, due to PARQUET-389, rolling back is already not an option. Two options I can see here are: # Release Spark 2.2.0 as is with a statement in the release notes saying that users should use parquet-avro 1.8.1 instead of 1.8.2 to avoid the Avro compatibility issue. # Wait for parquet-mr 1.8.3, which hopefully resolves this dependency issue (e.g., by reverting PARQUET-358). > Roll back parquet-mr 1.8.2 to parquet-1.8.1 > --- > > Key: SPARK-20958 > URL: https://issues.apache.org/jira/browse/SPARK-20958 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > > We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on > avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 > and avro 1.7.7 used by spark-core 2.2.0-rc2. > Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro > (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the > reasons mentioned in [PR > #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. > Therefore, we don't really have many choices here and have to roll back > parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1
[ https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16034310#comment-16034310 ] Cheng Lian commented on SPARK-20958: [~rdblue] I think the root cause here is we cherry-picked parquet-mr [PR #318|https://github.com/apache/parquet-mr/pull/318] to parquet-mr 1.8.2, and introduced this avro upgrade. Tried to roll back parquet-mr back to 1.8.1 but it doesn't work well because this brings back [PARQUET-389|https://issues.apache.org/jira/browse/PARQUET-389] and breaks some test cases involving schema evolution. It would be nice if we can have a parquet-mr 1.8.3 or 1.8.2.1 release that has [PR #318|https://github.com/apache/parquet-mr/pull/318] reverted from 1.8.2? I think cherry-picking that PR is also problematic for parquet-mr because it introduces a backward-incompatible dependency change in a maintenance release. > Roll back parquet-mr 1.8.2 to parquet-1.8.1 > --- > > Key: SPARK-20958 > URL: https://issues.apache.org/jira/browse/SPARK-20958 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > > We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on > avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 > and avro 1.7.7 used by spark-core 2.2.0-rc2. > Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro > (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the > reasons mentioned in [PR > #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. > Therefore, we don't really have many choices here and have to roll back > parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1
[ https://issues.apache.org/jira/browse/SPARK-20958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-20958: --- Description: We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 and avro 1.7.7 used by spark-core 2.2.0-rc2. Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the reasons mentioned in [PR #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. Therefore, we don't really have many choices here and have to roll back parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. was: We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 and avro 1.7.7 used by spark-core 2.2.0-rc2. , Spark 2.2.0-rc2 introduced two incompatible versions of avro (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the reasons mentioned in [PR #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. Therefore, we don't really have many choices here and have to roll back parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. > Roll back parquet-mr 1.8.2 to parquet-1.8.1 > --- > > Key: SPARK-20958 > URL: https://issues.apache.org/jira/browse/SPARK-20958 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > > We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on > avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 > and avro 1.7.7 used by spark-core 2.2.0-rc2. > Basically, Spark 2.2.0-rc2 introduced two incompatible versions of avro > (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the > reasons mentioned in [PR > #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. > Therefore, we don't really have many choices here and have to roll back > parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20958) Roll back parquet-mr 1.8.2 to parquet-1.8.1
Cheng Lian created SPARK-20958: -- Summary: Roll back parquet-mr 1.8.2 to parquet-1.8.1 Key: SPARK-20958 URL: https://issues.apache.org/jira/browse/SPARK-20958 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Cheng Lian Assignee: Cheng Lian We recently realized that parquet-mr 1.8.2 used by Spark 2.2.0-rc2 depends on avro 1.8.1, which is incompatible with avro 1.7.6 used by parquet-mr 1.8.1 and avro 1.7.7 used by spark-core 2.2.0-rc2. , Spark 2.2.0-rc2 introduced two incompatible versions of avro (1.7.7 and 1.8.1). Upgrading avro 1.7.7 to 1.8.1 is not preferable due to the reasons mentioned in [PR #17163|https://github.com/apache/spark/pull/17163#issuecomment-286563131]. Therefore, we don't really have many choices here and have to roll back parquet-mr 1.8.2 to 1.8.1 to resolve this dependency conflict. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (PARQUET-980) Cannot read row group larger than 2GB
[ https://issues.apache.org/jira/browse/PARQUET-980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16007326#comment-16007326 ] Cheng Lian edited comment on PARQUET-980 at 5/11/17 10:46 PM: -- The current write path ensures that it never writes a page that is larger than 2GB, but the read path may read 1 or more column chunks consisting of multiple pages into a single byte array (or {{ByteBuffer}}) no larger than 2GB. We hit this issue in production because the data distribution happened to be similar to the situation mentioned in the JIRA description and produced a skewed row group containing a column chunk larger than 2GB. I think there are two separate issues to fix: # On the write path, the strategy that dynamically adjusts memory check intervals needs some tweaking. The assumption that sizes of adjacent records are similar can be easily broken. # On the read path, the {{ConsecutiveChunkList.readAll()}} method should support reading data larger than 2GB, probably by using multiple buffers. Another option is to ensure that no row groups larger than 2GB can be ever written. Thoughts? BTW, the [parquet-python|https://github.com/jcrobak/parquet-python/] library can read this kind of malformed Parquet files successfully with [this patch|https://github.com/jcrobak/parquet-python/pull/56]. We used it to recover our data from the malformed Parquet file. was (Author: lian cheng): The current write path ensures that it never writes a page that is larger than 2GB, but the read path may read 1 or more column chunks consisting of multiple pages into a single byte array (or {{ByteBuffer}}) no larger than 2GB. We hit this issue in production because the data distribution happened to be similar to the situation mentioned in the JIRA description and produced a skewed row group containing a column chunk larger than 2GB. I think there are two separate issues to fix: # On the write path, the strategy that dynamically adjusts memory check intervals needs some tweaking. The assumption that sizes of adjacent records are similar can be easily broken. # On the read path, the {{ConsecutiveChunkList.readAll()}} method should support reading data larger than 2GB, probably by using multiple buffers. Another option is to ensure that no row groups larger than 2GB can be ever written. Thoughts? BTW, the [parquet-python|https://github.com/jcrobak/parquet-python/] library can read this kind of malformed Parquet file successfully with [this patch|https://github.com/jcrobak/parquet-python/pull/56]. We used it to recover our data from the malformed Parquet file. > Cannot read row group larger than 2GB > - > > Key: PARQUET-980 > URL: https://issues.apache.org/jira/browse/PARQUET-980 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.8.0, 1.8.1, 1.8.2 >Reporter: Herman van Hovell > > Parquet MR 1.8.2 does not support reading row groups which are larger than 2 > GB. > See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064 > We are seeing this when writing skewed records. This throws off the > estimation of the memory check interval in the InternalParquetRecordWriter. > The following spark code illustrates this: > {noformat} > /** > * Create a data frame that will make parquet write a file with a row group > larger than 2 GB. Parquet > * only checks the size of the row group after writing a number of records. > This number is based on > * average row size of the already written records. This is problematic in > the following scenario: > * - The initial (100) records in the record group are relatively small. > * - The InternalParquetRecordWriter checks if it needs to write to disk (it > should not), it assumes > * that the remaining records have a similar size, and (greatly) increases > the check interval (usually > * to 1). > * - The remaining records are much larger then expected, making the row > group larger than 2 GB (which > * makes reading the row group impossible). > * > * The data frame below illustrates such a scenario. This creates a row group > of approximately 4GB. > */ > val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator => > var i = 0 > val random = new scala.util.Random(42) > val buffer = new Array[Char](75) > iterator.map { id => > // the first 200 records have a length of 1K and the remaining 2000 have > a length of 750K. > val numChars = if (i < 200) 1000 else 75 > i += 1 > // create a random array > var j = 0 > while (j < numChars) { > // Generate a char (borrowed from scala.util.Random) > buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar > j += 1 > } >
[jira] [Commented] (PARQUET-980) Cannot read row group larger than 2GB
[ https://issues.apache.org/jira/browse/PARQUET-980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16007326#comment-16007326 ] Cheng Lian commented on PARQUET-980: The current write path ensures that it never writes a page that is larger than 2GB, but the read path may read 1 or more column chunks consisting of multiple pages into a single byte array (or {{ByteBuffer}}) no larger than 2GB. We hit this issue in production because the data distribution happened to be similar to the situation mentioned in the JIRA description and produced a skewed row group containing a column chunk larger than 2GB. I think there are two separate issues to fix: # On the write path, the strategy that dynamically adjusts memory check intervals needs some tweaking. The assumption that sizes of adjacent records are similar can be easily broken. # On the read path, the {{ConsecutiveChunkList.readAll()}} method should support reading data larger than 2GB, probably by using multiple buffers. Another option is to ensure that no row groups larger than 2GB can be ever written. Thoughts? BTW, the [parquet-python|https://github.com/jcrobak/parquet-python/] library can read this kind of malformed Parquet file successfully with [this patch|https://github.com/jcrobak/parquet-python/pull/56]. We used it to recover our data from the malformed Parquet file. > Cannot read row group larger than 2GB > - > > Key: PARQUET-980 > URL: https://issues.apache.org/jira/browse/PARQUET-980 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.8.0, 1.8.1, 1.8.2 >Reporter: Herman van Hovell > > Parquet MR 1.8.2 does not support reading row groups which are larger than 2 > GB. > See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064 > We are seeing this when writing skewed records. This throws off the > estimation of the memory check interval in the InternalParquetRecordWriter. > The following spark code illustrates this: > {noformat} > /** > * Create a data frame that will make parquet write a file with a row group > larger than 2 GB. Parquet > * only checks the size of the row group after writing a number of records. > This number is based on > * average row size of the already written records. This is problematic in > the following scenario: > * - The initial (100) records in the record group are relatively small. > * - The InternalParquetRecordWriter checks if it needs to write to disk (it > should not), it assumes > * that the remaining records have a similar size, and (greatly) increases > the check interval (usually > * to 1). > * - The remaining records are much larger then expected, making the row > group larger than 2 GB (which > * makes reading the row group impossible). > * > * The data frame below illustrates such a scenario. This creates a row group > of approximately 4GB. > */ > val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator => > var i = 0 > val random = new scala.util.Random(42) > val buffer = new Array[Char](75) > iterator.map { id => > // the first 200 records have a length of 1K and the remaining 2000 have > a length of 750K. > val numChars = if (i < 200) 1000 else 75 > i += 1 > // create a random array > var j = 0 > while (j < numChars) { > // Generate a char (borrowed from scala.util.Random) > buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar > j += 1 > } > // create a string: the string constructor will copy the buffer. > new String(buffer, 0, numChars) > } > } > badDf.write.parquet("somefile") > val corruptedDf = spark.read.parquet("somefile") > corruptedDf.select(count(lit(1)), max(length($"value"))).show() > {noformat} > The latter fails with the following exception: > {noformat} > java.lang.NegativeArraySizeException > at > org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1064) > at > org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:698) > ... > {noformat} > This seems to be fixed by commit > https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 > in parquet 1.9.x. Is there any chance that we can fix this in 1.8.x? > This can happen when -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (PARQUET-980) Cannot read row group larger than 2GB
[ https://issues.apache.org/jira/browse/PARQUET-980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated PARQUET-980: --- Affects Version/s: 1.8.1 1.8.2 > Cannot read row group larger than 2GB > - > > Key: PARQUET-980 > URL: https://issues.apache.org/jira/browse/PARQUET-980 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.8.0, 1.8.1, 1.8.2 >Reporter: Herman van Hovell > > Parquet MR 1.8.2 does not support reading row groups which are larger than 2 > GB. > See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064 > We are seeing this when writing skewed records. This throws off the > estimation of the memory check interval in the InternalParquetRecordWriter. > The following spark code illustrates this: > {noformat} > /** > * Create a data frame that will make parquet write a file with a row group > larger than 2 GB. Parquet > * only checks the size of the row group after writing a number of records. > This number is based on > * average row size of the already written records. This is problematic in > the following scenario: > * - The initial (100) records in the record group are relatively small. > * - The InternalParquetRecordWriter checks if it needs to write to disk (it > should not), it assumes > * that the remaining records have a similar size, and (greatly) increases > the check interval (usually > * to 1). > * - The remaining records are much larger then expected, making the row > group larger than 2 GB (which > * makes reading the row group impossible). > * > * The data frame below illustrates such a scenario. This creates a row group > of approximately 4GB. > */ > val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator => > var i = 0 > val random = new scala.util.Random(42) > val buffer = new Array[Char](75) > iterator.map { id => > // the first 200 records have a length of 1K and the remaining 2000 have > a length of 750K. > val numChars = if (i < 200) 1000 else 75 > i += 1 > // create a random array > var j = 0 > while (j < numChars) { > // Generate a char (borrowed from scala.util.Random) > buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar > j += 1 > } > // create a string: the string constructor will copy the buffer. > new String(buffer, 0, numChars) > } > } > badDf.write.parquet("somefile") > val corruptedDf = spark.read.parquet("somefile") > corruptedDf.select(count(lit(1)), max(length($"value"))).show() > {noformat} > The latter fails with the following exception: > {noformat} > java.lang.NegativeArraySizeException > at > org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1064) > at > org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:698) > ... > {noformat} > This seems to be fixed by commit > https://github.com/apache/parquet-mr/commit/6b605a4ea05b66e1a6bf843353abcb4834a4ced8 > in parquet 1.9.x. Is there any chance that we can fix this in 1.8.x? > This can happen when -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (SPARK-20132) Add documentation for column string functions
[ https://issues.apache.org/jira/browse/SPARK-20132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-20132: --- Fix Version/s: 2.2.0 > Add documentation for column string functions > - > > Key: SPARK-20132 > URL: https://issues.apache.org/jira/browse/SPARK-20132 > Project: Spark > Issue Type: Documentation > Components: PySpark, SQL >Affects Versions: 2.1.0 >Reporter: Michael Patterson >Assignee: Michael Patterson >Priority: Minor > Labels: documentation, newbie > Fix For: 2.2.0, 2.3.0 > > > Four Column string functions do not have documentation for PySpark: > rlike > like > startswith > endswith > These functions are called through the _bin_op interface, which allows the > passing of a docstring. I have added docstrings with examples to each of the > four functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20246) Should check determinism when pushing predicates down through aggregation
[ https://issues.apache.org/jira/browse/SPARK-20246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-20246: --- Labels: correctness (was: ) > Should check determinism when pushing predicates down through aggregation > - > > Key: SPARK-20246 > URL: https://issues.apache.org/jira/browse/SPARK-20246 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Weiluo Ren > Labels: correctness > > {code}import org.apache.spark.sql.functions._ > spark.range(1,1000).distinct.withColumn("random", > rand()).filter(col("random") > 0.3).orderBy("random").show{code} > gives wrong result. > In the optimized logical plan, it shows that the filter with the > non-deterministic predicate is pushed beneath the aggregate operator, which > should not happen. > cc [~lian cheng] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20246) Should check determinism when pushing predicates down through aggregation
[ https://issues.apache.org/jira/browse/SPARK-20246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959946#comment-15959946 ] Cheng Lian commented on SPARK-20246: [This line|https://github.com/apache/spark/blob/a4491626ed8169f0162a0dfb78736c9b9e7fb434/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L795] should be the root cause. We didn't check determinism of the predicates before pushing them down. The same thing also applies when pushing predicates through union and window operators. cc [~cloud_fan] > Should check determinism when pushing predicates down through aggregation > - > > Key: SPARK-20246 > URL: https://issues.apache.org/jira/browse/SPARK-20246 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Weiluo Ren > > {code}import org.apache.spark.sql.functions._ > spark.range(1,1000).distinct.withColumn("random", > rand()).filter(col("random") > 0.3).orderBy("random").show{code} > gives wrong result. > In the optimized logical plan, it shows that the filter with the > non-deterministic predicate is pushed beneath the aggregate operator, which > should not happen. > cc [~lian cheng] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19716) Dataset should allow by-name resolution for struct type elements in array
[ https://issues.apache.org/jira/browse/SPARK-19716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19716: --- Fix Version/s: (was: 2.3.0) 2.2.0 > Dataset should allow by-name resolution for struct type elements in array > - > > Key: SPARK-19716 > URL: https://issues.apache.org/jira/browse/SPARK-19716 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > Fix For: 2.2.0 > > > if we have a DataFrame with schema {{a: int, b: int, c: int}}, and convert it > to Dataset with {{case class Data(a: Int, c: Int)}}, it works and we will > extract the `a` and `c` columns to build the Data. > However, if the struct is inside array, e.g. schema is {{arr: array>}}, and we wanna convert it to Dataset with {{case class > ComplexData(arr: Seq[Data])}}, we will fail. The reason is, to allow > compatible types, e.g. convert {{a: int}} to {{case class A(a: Long)}}, we > will add cast for each field, except struct type field, because struct type > is flexible, the number of columns can mismatch. We should probably also skip > cast for array and map type. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19716) Dataset should allow by-name resolution for struct type elements in array
[ https://issues.apache.org/jira/browse/SPARK-19716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved SPARK-19716. Resolution: Fixed Fix Version/s: 2.3.0 Issue resolved by pull request 17398 [https://github.com/apache/spark/pull/17398] > Dataset should allow by-name resolution for struct type elements in array > - > > Key: SPARK-19716 > URL: https://issues.apache.org/jira/browse/SPARK-19716 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > Fix For: 2.3.0 > > > if we have a DataFrame with schema {{a: int, b: int, c: int}}, and convert it > to Dataset with {{case class Data(a: Int, c: Int)}}, it works and we will > extract the `a` and `c` columns to build the Data. > However, if the struct is inside array, e.g. schema is {{arr: array>}}, and we wanna convert it to Dataset with {{case class > ComplexData(arr: Seq[Data])}}, we will fail. The reason is, to allow > compatible types, e.g. convert {{a: int}} to {{case class A(a: Long)}}, we > will add cast for each field, except struct type field, because struct type > is flexible, the number of columns can mismatch. We should probably also skip > cast for array and map type. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19716) Dataset should allow by-name resolution for struct type elements in array
[ https://issues.apache.org/jira/browse/SPARK-19716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-19716: -- Assignee: Wenchen Fan > Dataset should allow by-name resolution for struct type elements in array > - > > Key: SPARK-19716 > URL: https://issues.apache.org/jira/browse/SPARK-19716 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > > if we have a DataFrame with schema {{a: int, b: int, c: int}}, and convert it > to Dataset with {{case class Data(a: Int, c: Int)}}, it works and we will > extract the `a` and `c` columns to build the Data. > However, if the struct is inside array, e.g. schema is {{arr: array>}}, and we wanna convert it to Dataset with {{case class > ComplexData(arr: Seq[Data])}}, we will fail. The reason is, to allow > compatible types, e.g. convert {{a: int}} to {{case class A(a: Long)}}, we > will add cast for each field, except struct type field, because struct type > is flexible, the number of columns can mismatch. We should probably also skip > cast for array and map type. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19912) String literals are not escaped while performing Hive metastore level partition pruning
[ https://issues.apache.org/jira/browse/SPARK-19912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19912: --- Summary: String literals are not escaped while performing Hive metastore level partition pruning (was: String literals are not escaped while performing partition pruning at Hive metastore level) > String literals are not escaped while performing Hive metastore level > partition pruning > --- > > Key: SPARK-19912 > URL: https://issues.apache.org/jira/browse/SPARK-19912 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Cheng Lian > Labels: correctness > > {{Shim_v0_13.convertFilters()}} doesn't escape string literals while > generating Hive style partition predicates. > The following SQL-injection-like test case illustrates this issue: > {code} > test("SPARK-19912") { > withTable("spark_19912") { > Seq( > (1, "p1", "q1"), > (2, "p1\" and q=\"q1", "q2") > ).toDF("a", "p", "q").write.partitionBy("p", > "q").saveAsTable("spark_19912") > checkAnswer( > spark.table("foo").filter($"p" === "p1\" and q = \"q1").select($"a"), > Row(2) > ) > } > } > {code} > The above test case fails like this: > {noformat} > [info] - spark_19912 *** FAILED *** (13 seconds, 74 milliseconds) > [info] Results do not match for query: > [info] Timezone: > sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-2880,dstSavings=360,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-2880,dstSavings=360,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=720,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=720,endTimeMode=0]] > [info] Timezone Env: > [info] > [info] == Parsed Logical Plan == > [info] 'Project [unresolvedalias('a, None)] > [info] +- Filter (p#27 = p1" and q = "q1) > [info] +- SubqueryAlias spark_19912 > [info] +- Relation[a#26,p#27,q#28] parquet > [info] > [info] == Analyzed Logical Plan == > [info] a: int > [info] Project [a#26] > [info] +- Filter (p#27 = p1" and q = "q1) > [info] +- SubqueryAlias spark_19912 > [info] +- Relation[a#26,p#27,q#28] parquet > [info] > [info] == Optimized Logical Plan == > [info] Project [a#26] > [info] +- Filter (isnotnull(p#27) && (p#27 = p1" and q = "q1)) > [info] +- Relation[a#26,p#27,q#28] parquet > [info] > [info] == Physical Plan == > [info] *Project [a#26] > [info] +- *FileScan parquet default.spark_19912[a#26,p#27,q#28] Batched: > true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: > 0, PartitionFilters: [isnotnull(p#27), (p#27 = p1" and q = "q1)], > PushedFilters: [], ReadSchema: struct > [info] == Results == > [info] > [info] == Results == > [info] !== Correct Answer - 1 == == Spark Answer - 0 == > [info]struct<> struct<> > [info] ![2] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19912) String literals are not escaped while performing partition pruning at Hive metastore level
[ https://issues.apache.org/jira/browse/SPARK-19912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19912: --- Description: {{Shim_v0_13.convertFilters()}} doesn't escape string literals while generating Hive style partition predicates. The following SQL-injection-like test case illustrates this issue: {code} test("SPARK-19912") { withTable("spark_19912") { Seq( (1, "p1", "q1"), (2, "p1\" and q=\"q1", "q2") ).toDF("a", "p", "q").write.partitionBy("p", "q").saveAsTable("spark_19912") checkAnswer( spark.table("foo").filter($"p" === "p1\" and q = \"q1").select($"a"), Row(2) ) } } {code} The above test case fails like this: {noformat} [info] - spark_19912 *** FAILED *** (13 seconds, 74 milliseconds) [info] Results do not match for query: [info] Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-2880,dstSavings=360,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-2880,dstSavings=360,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=720,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=720,endTimeMode=0]] [info] Timezone Env: [info] [info] == Parsed Logical Plan == [info] 'Project [unresolvedalias('a, None)] [info] +- Filter (p#27 = p1" and q = "q1) [info] +- SubqueryAlias spark_19912 [info] +- Relation[a#26,p#27,q#28] parquet [info] [info] == Analyzed Logical Plan == [info] a: int [info] Project [a#26] [info] +- Filter (p#27 = p1" and q = "q1) [info] +- SubqueryAlias spark_19912 [info] +- Relation[a#26,p#27,q#28] parquet [info] [info] == Optimized Logical Plan == [info] Project [a#26] [info] +- Filter (isnotnull(p#27) && (p#27 = p1" and q = "q1)) [info] +- Relation[a#26,p#27,q#28] parquet [info] [info] == Physical Plan == [info] *Project [a#26] [info] +- *FileScan parquet default.spark_19912[a#26,p#27,q#28] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(p#27), (p#27 = p1" and q = "q1)], PushedFilters: [], ReadSchema: struct [info] == Results == [info] [info] == Results == [info] !== Correct Answer - 1 == == Spark Answer - 0 == [info]struct<> struct<> [info] ![2] {noformat} was: {{Shim_v0_13.convertFilters()}} doesn't escape string literals while generating Hive style partition predicates. The following SQL-injection-like test case illustrates this issue: {code} test("foo") { withTable("foo") { Seq( (1, "p1", "q1"), (2, "p1\" and q=\"q1", "q2") ).toDF("a", "p", "q").write.partitionBy("p", "q").saveAsTable("foo") checkAnswer( spark.table("foo").filter($"p" === "p1\" and q = \"q1").select($"a"), Row(2) ) } } {code} > String literals are not escaped while performing partition pruning at Hive > metastore level > -- > > Key: SPARK-19912 > URL: https://issues.apache.org/jira/browse/SPARK-19912 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.2.0 >Reporter: Cheng Lian > Labels: correctness > > {{Shim_v0_13.convertFilters()}} doesn't escape string literals while > generating Hive style partition predicates. > The following SQL-injection-like test case illustrates this issue: > {code} > test("SPARK-19912") { > withTable("spark_19912") { > Seq( > (1, "p1", "q1"), > (2, "p1\" and q=\"q1", "q2") > ).toDF("a", "p", "q").write.partitionBy("p", > "q").saveAsTable("spark_19912") > checkAnswer( > spark.table("foo").filter($"p" === "p1\" and q = \"q1").select($"a"), > Row(2) > ) > } > } > {code} > The above test case fails like this: > {noformat} > [info] - spark_19912 *** FAILED *** (13 seconds, 74 milliseconds) > [info] Results do not match for query: > [info] Timezone: > sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-2880,dstSavings=360,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-2880,dstSavings=360,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=720,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=720,endTimeMode=0]] > [info] Timezone Env: > [info] > [info] == Parsed Logical Plan == > [info] 'Project [unresolvedalias('a, None)] > [info] +- Filter (p#27 = p1" and q = "q1) > [info] +- SubqueryAlias spark_19912 > [info] +- Relation[a#26,p#27,q#28] parquet > [info] > [info] == Analyzed Logical Plan == > [info] a: int >
[jira] [Updated] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in partitioned persisted tables
[ https://issues.apache.org/jira/browse/SPARK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19887: --- Labels: correctness (was: ) > __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in > partitioned persisted tables > - > > Key: SPARK-19887 > URL: https://issues.apache.org/jira/browse/SPARK-19887 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.2.0 >Reporter: Cheng Lian > Labels: correctness > > The following Spark shell snippet under Spark 2.1 reproduces this issue: > {code} > val data = Seq( > ("p1", 1, 1), > ("p2", 2, 2), > (null, 3, 3) > ) > // Correct case: Saving partitioned data to file system. > val path = "/tmp/partitioned" > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > parquet(path) > spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) > // +---+---+---+ > // |c |a |b | > // +---+---+---+ > // |2 |p2 |2 | > // |1 |p1 |1 | > // +---+---+---+ > // Incorrect case: Saving partitioned data as persisted table. > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > saveAsTable("test_null") > spark.table("test_null").filter($"a".isNotNull).show(truncate = false) > // +---+--+---+ > // |c |a |b | > // +---+--+---+ > // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here > // |1 |p1|1 | > // |2 |p2|2 | > // +---+--+---+ > {code} > Hive-style partitioned tables use the magic string > {{\_\_HIVE_DEFAULT_PARTITION\_\_}} to indicate {{NULL}} partition values in > partition directory names. However, in the case persisted partitioned table, > this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19912) String literals are not escaped while performing partition pruning at Hive metastore level
Cheng Lian created SPARK-19912: -- Summary: String literals are not escaped while performing partition pruning at Hive metastore level Key: SPARK-19912 URL: https://issues.apache.org/jira/browse/SPARK-19912 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.1, 2.2.0 Reporter: Cheng Lian {{Shim_v0_13.convertFilters()}} doesn't escape string literals while generating Hive style partition predicates. The following SQL-injection-like test case illustrates this issue: {code} test("foo") { withTable("foo") { Seq( (1, "p1", "q1"), (2, "p1\" and q=\"q1", "q2") ).toDF("a", "p", "q").write.partitionBy("p", "q").saveAsTable("foo") checkAnswer( spark.table("foo").filter($"p" === "p1\" and q = \"q1").select($"a"), Row(2) ) } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in partitioned persisted tables
[ https://issues.apache.org/jira/browse/SPARK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19887: --- Affects Version/s: 2.2.0 > __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in > partitioned persisted tables > - > > Key: SPARK-19887 > URL: https://issues.apache.org/jira/browse/SPARK-19887 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.2.0 >Reporter: Cheng Lian > > The following Spark shell snippet under Spark 2.1 reproduces this issue: > {code} > val data = Seq( > ("p1", 1, 1), > ("p2", 2, 2), > (null, 3, 3) > ) > // Correct case: Saving partitioned data to file system. > val path = "/tmp/partitioned" > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > parquet(path) > spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) > // +---+---+---+ > // |c |a |b | > // +---+---+---+ > // |2 |p2 |2 | > // |1 |p1 |1 | > // +---+---+---+ > // Incorrect case: Saving partitioned data as persisted table. > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > saveAsTable("test_null") > spark.table("test_null").filter($"a".isNotNull).show(truncate = false) > // +---+--+---+ > // |c |a |b | > // +---+--+---+ > // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here > // |1 |p1|1 | > // |2 |p2|2 | > // +---+--+---+ > {code} > Hive-style partitioned tables use the magic string > {{\_\_HIVE_DEFAULT_PARTITION\_\_}} to indicate {{NULL}} partition values in > partition directory names. However, in the case persisted partitioned table, > this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19905) Dataset.inputFiles is broken for Hive SerDe tables
Cheng Lian created SPARK-19905: -- Summary: Dataset.inputFiles is broken for Hive SerDe tables Key: SPARK-19905 URL: https://issues.apache.org/jira/browse/SPARK-19905 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Cheng Lian Assignee: Cheng Lian The following snippet reproduces this issue: {code} spark.range(10).createOrReplaceTempView("t") spark.sql("CREATE TABLE u STORED AS RCFILE AS SELECT * FROM t") spark.table("u").inputFiles.foreach(println) {code} In Spark 2.2, it prints nothing, while in Spark 2.1, it prints something like {noformat} file:/Users/lian/local/var/lib/hive/warehouse_1.2.1/u {noformat} on my laptop. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in partitioned persisted tables
[ https://issues.apache.org/jira/browse/SPARK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19887: --- Description: The following Spark shell snippet under Spark 2.1 reproduces this issue: {code} val data = Seq( ("p1", 1, 1), ("p2", 2, 2), (null, 3, 3) ) // Correct case: Saving partitioned data to file system. val path = "/tmp/partitioned" data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). parquet(path) spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) // +---+---+---+ // |c |a |b | // +---+---+---+ // |2 |p2 |2 | // |1 |p1 |1 | // +---+---+---+ // Incorrect case: Saving partitioned data as persisted table. data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). saveAsTable("test_null") spark.table("test_null").filter($"a".isNotNull).show(truncate = false) // +---+--+---+ // |c |a |b | // +---+--+---+ // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here // |1 |p1|1 | // |2 |p2|2 | // +---+--+---+ {code} Hive-style partitioned tables use the magic string {{\_\_HIVE_DEFAULT_PARTITION\_\_}} to indicate {{NULL}} partition values in partition directory names. However, in the case persisted partitioned table, this magic string is not interpreted as {{NULL}} but a regular string. was: The following Spark shell snippet under Spark 2.1 reproduces this issue: {code} val data = Seq( ("p1", 1, 1), ("p2", 2, 2), (null, 3, 3) ) // Correct case: Saving partitioned data to file system. val path = "/tmp/partitioned" data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). parquet(path) spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) // +---+---+---+ // |c |a |b | // +---+---+---+ // |2 |p2 |2 | // |1 |p1 |1 | // +---+---+---+ // Incorrect case: Saving partitioned data as persisted table. data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). saveAsTable("test_null") spark.table("test_null").filter($"a".isNotNull).show(truncate = false) // +---+--+---+ // |c |a |b | // +---+--+---+ // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here // |1 |p1|1 | // |2 |p2|2 | // +---+--+---+ {code} Hive-style partitioned tables use the magic string {{__HIVE_DEFAULT_PARTITION__}} to indicate {{NULL}} partition values in partition directory names. However, in the case persisted partitioned table, this magic string is not interpreted as {{NULL}} but a regular string. > __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in > partitioned persisted tables > - > > Key: SPARK-19887 > URL: https://issues.apache.org/jira/browse/SPARK-19887 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Cheng Lian > > The following Spark shell snippet under Spark 2.1 reproduces this issue: > {code} > val data = Seq( > ("p1", 1, 1), > ("p2", 2, 2), > (null, 3, 3) > ) > // Correct case: Saving partitioned data to file system. > val path = "/tmp/partitioned" > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > parquet(path) > spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) > // +---+---+---+ > // |c |a |b | > // +---+---+---+ > // |2 |p2 |2 | > // |1 |p1 |1 | > // +---+---+---+ > // Incorrect case: Saving partitioned data as persisted table. > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > saveAsTable("test_null") > spark.table("test_null").filter($"a".isNotNull).show(truncate = false) > // +---+--+---+ > // |c |a |b | > // +---+--+---+ > // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here > // |1 |p1|1 | > // |2 |p2|2 | > // +---+--+---+ > {code} > Hive-style partitioned tables use the magic string > {{\_\_HIVE_DEFAULT_PARTITION\_\_}} to indicate {{NULL}} partition values in > partition directory names. However, in the case persisted partitioned table, > this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe,
[jira] [Updated] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in partitioned persisted tables
[ https://issues.apache.org/jira/browse/SPARK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19887: --- Description: The following Spark shell snippet under Spark 2.1 reproduces this issue: {code} val data = Seq( ("p1", 1, 1), ("p2", 2, 2), (null, 3, 3) ) // Correct case: Saving partitioned data to file system. val path = "/tmp/partitioned" data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). parquet(path) spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) // +---+---+---+ // |c |a |b | // +---+---+---+ // |2 |p2 |2 | // |1 |p1 |1 | // +---+---+---+ // Incorrect case: Saving partitioned data as persisted table. data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). saveAsTable("test_null") spark.table("test_null").filter($"a".isNotNull).show(truncate = false) // +---+--+---+ // |c |a |b | // +---+--+---+ // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here // |1 |p1|1 | // |2 |p2|2 | // +---+--+---+ {code} Hive-style partitioned tables use the magic string {{__HIVE_DEFAULT_PARTITION__}} to indicate {{NULL}} partition values in partition directory names. However, in the case persisted partitioned table, this magic string is not interpreted as {{NULL}} but a regular string. was: The following Spark shell snippet under Spark 2.1 reproduces this issue: {code} val data = Seq( ("p1", 1, 1), ("p2", 2, 2), (null, 3, 3) ) // Correct case: Saving partitioned data to file system. val path = "/tmp/partitioned" data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). parquet(path) spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) // +---+---+---+ // |c |a |b | // +---+---+---+ // |2 |p2 |2 | // |1 |p1 |1 | // +---+---+---+ // Incorrect case: Saving partitioned data as persisted table. data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). saveAsTable("test_null") spark.table("test_null").filter($"a".isNotNull).show(truncate = false) // +---+--+---+ // |c |a |b | // +---+--+---+ // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here // |1 |p1|1 | // |2 |p2|2 | // +---+--+---+ {code} Hive-style partitioned table uses magic string {{"__HIVE_DEFAULT_PARTITION__"}} to indicate {{NULL}} partition values in partition directory names. However, in the case persisted partitioned table, this magic string is not interpreted as {{NULL}} but a regular string. > __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in > partitioned persisted tables > - > > Key: SPARK-19887 > URL: https://issues.apache.org/jira/browse/SPARK-19887 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Cheng Lian > > The following Spark shell snippet under Spark 2.1 reproduces this issue: > {code} > val data = Seq( > ("p1", 1, 1), > ("p2", 2, 2), > (null, 3, 3) > ) > // Correct case: Saving partitioned data to file system. > val path = "/tmp/partitioned" > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > parquet(path) > spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) > // +---+---+---+ > // |c |a |b | > // +---+---+---+ > // |2 |p2 |2 | > // |1 |p1 |1 | > // +---+---+---+ > // Incorrect case: Saving partitioned data as persisted table. > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > saveAsTable("test_null") > spark.table("test_null").filter($"a".isNotNull).show(truncate = false) > // +---+--+---+ > // |c |a |b | > // +---+--+---+ > // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here > // |1 |p1|1 | > // |2 |p2|2 | > // +---+--+---+ > {code} > Hive-style partitioned tables use the magic string > {{__HIVE_DEFAULT_PARTITION__}} to indicate {{NULL}} partition values in > partition directory names. However, in the case persisted partitioned table, > this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail:
[jira] [Updated] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in partitioned persisted tables
[ https://issues.apache.org/jira/browse/SPARK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19887: --- Summary: __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in partitioned persisted tables (was: __HIVE_DEFAULT_PARTITION__ not interpreted as NULL partition value in partitioned persisted tables) > __HIVE_DEFAULT_PARTITION__ is not interpreted as NULL partition value in > partitioned persisted tables > - > > Key: SPARK-19887 > URL: https://issues.apache.org/jira/browse/SPARK-19887 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Cheng Lian > > The following Spark shell snippet under Spark 2.1 reproduces this issue: > {code} > val data = Seq( > ("p1", 1, 1), > ("p2", 2, 2), > (null, 3, 3) > ) > // Correct case: Saving partitioned data to file system. > val path = "/tmp/partitioned" > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > parquet(path) > spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) > // +---+---+---+ > // |c |a |b | > // +---+---+---+ > // |2 |p2 |2 | > // |1 |p1 |1 | > // +---+---+---+ > // Incorrect case: Saving partitioned data as persisted table. > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > saveAsTable("test_null") > spark.table("test_null").filter($"a".isNotNull).show(truncate = false) > // +---+--+---+ > // |c |a |b | > // +---+--+---+ > // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here > // |1 |p1|1 | > // |2 |p2|2 | > // +---+--+---+ > {code} > Hive-style partitioned table uses magic string > {{"__HIVE_DEFAULT_PARTITION__"}} to indicate {{NULL}} partition values in > partition directory names. However, in the case persisted partitioned table, > this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ not interpreted as NULL partition value in partitioned persisted tables
Cheng Lian created SPARK-19887: -- Summary: __HIVE_DEFAULT_PARTITION__ not interpreted as NULL partition value in partitioned persisted tables Key: SPARK-19887 URL: https://issues.apache.org/jira/browse/SPARK-19887 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Cheng Lian The following Spark shell snippet under Spark 2.1 reproduces this issue: {code} val data = Seq( ("p1", 1, 1), ("p2", 2, 2), (null, 3, 3) ) // Correct case: Saving partitioned data to file system. val path = "/tmp/partitioned" data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). parquet(path) spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) // +---+---+---+ // |c |a |b | // +---+---+---+ // |2 |p2 |2 | // |1 |p1 |1 | // +---+---+---+ // Incorrect case: Saving partitioned data as persisted table. data. toDF("a", "b", "c"). write. mode("overwrite"). partitionBy("a", "b"). saveAsTable("test_null") spark.table("test_null").filter($"a".isNotNull).show(truncate = false) // +---+--+---+ // |c |a |b | // +---+--+---+ // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here // |1 |p1|1 | // |2 |p2|2 | // +---+--+---+ {code} Hive-style partitioned table uses magic string {{"__HIVE_DEFAULT_PARTITION__"}} to indicate {{NULL}} partition values in partition directory names. However, in the case persisted partitioned table, this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19887) __HIVE_DEFAULT_PARTITION__ not interpreted as NULL partition value in partitioned persisted tables
[ https://issues.apache.org/jira/browse/SPARK-19887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903749#comment-15903749 ] Cheng Lian commented on SPARK-19887: cc [~cloud_fan] > __HIVE_DEFAULT_PARTITION__ not interpreted as NULL partition value in > partitioned persisted tables > -- > > Key: SPARK-19887 > URL: https://issues.apache.org/jira/browse/SPARK-19887 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Cheng Lian > > The following Spark shell snippet under Spark 2.1 reproduces this issue: > {code} > val data = Seq( > ("p1", 1, 1), > ("p2", 2, 2), > (null, 3, 3) > ) > // Correct case: Saving partitioned data to file system. > val path = "/tmp/partitioned" > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > parquet(path) > spark.read.parquet(path).filter($"a".isNotNull).show(truncate = false) > // +---+---+---+ > // |c |a |b | > // +---+---+---+ > // |2 |p2 |2 | > // |1 |p1 |1 | > // +---+---+---+ > // Incorrect case: Saving partitioned data as persisted table. > data. > toDF("a", "b", "c"). > write. > mode("overwrite"). > partitionBy("a", "b"). > saveAsTable("test_null") > spark.table("test_null").filter($"a".isNotNull).show(truncate = false) > // +---+--+---+ > // |c |a |b | > // +---+--+---+ > // |3 |__HIVE_DEFAULT_PARTITION__|3 | <-- This line should not be here > // |1 |p1|1 | > // |2 |p2|2 | > // +---+--+---+ > {code} > Hive-style partitioned table uses magic string > {{"__HIVE_DEFAULT_PARTITION__"}} to indicate {{NULL}} partition values in > partition directory names. However, in the case persisted partitioned table, > this magic string is not interpreted as {{NULL}} but a regular string. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved SPARK-19737. Resolution: Fixed Issue resolved by pull request 17168 [https://github.com/apache/spark/pull/17168] > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an undefined > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, it may take the analyzer a long time before > realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocations > # Look up the function names from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian reassigned SPARK-19737: -- Assignee: Cheng Lian > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an undefined > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, it may take the analyzer a long time before > realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocations > # Look up the function names from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19737: --- Description: Let's consider the following simple SQL query that reference an undefined function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocations # Look up the function names from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. was: Let's consider the following simple SQL query that reference an invalid function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocations # Look up the function names from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an undefined > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, then it may take the analyzer a long time > before realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocations > # Look up the function names from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe,
[jira] [Updated] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19737: --- Description: Let's consider the following simple SQL query that reference an undefined function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocations # Look up the function names from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. was: Let's consider the following simple SQL query that reference an undefined function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocations # Look up the function names from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an undefined > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, it may take the analyzer a long time before > realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocations > # Look up the function names from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail:
[jira] [Updated] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19737: --- Description: Let's consider the following simple SQL query that reference an invalid function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocations # Look up the function names from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. was: Let's consider the following simple SQL query that reference an invalid function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocation # Look up the function name from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an invalid > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, then it may take the analyzer a long time > before realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocations > # Look up the function names from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail:
[jira] [Updated] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
[ https://issues.apache.org/jira/browse/SPARK-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19737: --- Description: Let's consider the following simple SQL query that reference an invalid function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocation # Look up the function name from the function registry # Report analysis error for any unregistered functions Since this rule doesn't actually try to resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly and make sure it gets executed before {{ResolveRelations}}. was: Let's consider the following simple SQL query that reference an invalid function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocation # Look up the function name from the function registry # Report analysis error for any unregistered functions Since this rule doesn't try to actually resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly. > New analysis rule for reporting unregistered functions without relying on > relation resolution > - > > Key: SPARK-19737 > URL: https://issues.apache.org/jira/browse/SPARK-19737 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Cheng Lian > Fix For: 2.2.0 > > > Let's consider the following simple SQL query that reference an invalid > function {{foo}} that is never registered in the function registry: > {code:sql} > SELECT foo(a) FROM t > {code} > Assuming table {{t}} is a partitioned temporary view consisting of a large > number of files stored on S3, then it may take the analyzer a long time > before realizing that {{foo}} is not registered yet. > The reason is that the existing analysis rule {{ResolveFunctions}} requires > all child expressions to be resolved first. Therefore, {{ResolveRelations}} > has to be executed first to resolve all columns referenced by the unresolved > function invocation. This further leads to partition discovery for {{t}}, > which may take a long time. > To address this case, we propose a new lightweight analysis rule > {{LookupFunctions}} that > # Matches all unresolved function invocation > # Look up the function name from the function registry > # Report analysis error for any unregistered functions > Since this rule doesn't actually try to resolve the unresolved functions, it > doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition > discovery. > We may put this analysis rule in a separate {{Once}} rule batch that sits > between the "Substitution" batch and the "Resolution" batch to avoid running > it repeatedly and make sure it gets executed before {{ResolveRelations}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands,
[jira] [Created] (SPARK-19737) New analysis rule for reporting unregistered functions without relying on relation resolution
Cheng Lian created SPARK-19737: -- Summary: New analysis rule for reporting unregistered functions without relying on relation resolution Key: SPARK-19737 URL: https://issues.apache.org/jira/browse/SPARK-19737 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Cheng Lian Fix For: 2.2.0 Let's consider the following simple SQL query that reference an invalid function {{foo}} that is never registered in the function registry: {code:sql} SELECT foo(a) FROM t {code} Assuming table {{t}} is a partitioned temporary view consisting of a large number of files stored on S3, then it may take the analyzer a long time before realizing that {{foo}} is not registered yet. The reason is that the existing analysis rule {{ResolveFunctions}} requires all child expressions to be resolved first. Therefore, {{ResolveRelations}} has to be executed first to resolve all columns referenced by the unresolved function invocation. This further leads to partition discovery for {{t}}, which may take a long time. To address this case, we propose a new lightweight analysis rule {{LookupFunctions}} that # Matches all unresolved function invocation # Look up the function name from the function registry # Report analysis error for any unregistered functions Since this rule doesn't try to actually resolve the unresolved functions, it doesn't rely on {{ResolveRelations}} and therefore doesn't trigger partition discovery. We may put this analysis rule in a separate {{Once}} rule batch that sits between the "Substitution" batch and the "Resolution" batch to avoid running it repeatedly. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (PARQUET-893) GroupColumnIO.getFirst() doesn't check for empty groups
[ https://issues.apache.org/jira/browse/PARQUET-893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated PARQUET-893: --- Description: The following Spark snippet reproduces this issue with Spark 2.1 (with parquet-mr 1.8.1) and Spark 2.2-SNAPSHOT (with parquet-mr 1.8.2): {code} import org.apache.spark.sql.types._ val path = "/tmp/parquet-test" case class Inner(f00: Int) case class Outer(f0: Inner, f1: Int) val df = Seq(Outer(Inner(1), 1)).toDF() df.printSchema() // root // |-- f0: struct (nullable = true) // ||-- f00: integer (nullable = false) // |-- f1: integer (nullable = false) df.write.mode("overwrite").parquet(path) val requestedSchema = new StructType(). add("f0", new StructType(). // This nested field name differs from the original one add("f01", IntegerType)). add("f1", IntegerType) println(requestedSchema.treeString) // root // |-- f0: struct (nullable = true) // ||-- f01: integer (nullable = true) // |-- f1: integer (nullable = true) spark.read.schema(requestedSchema).parquet(path).show() {code} In the above snippet, {{requestedSchema}} is compatible with the schema of the written Parquet file, but the following exception is thrown: {noformat} org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/parquet-test/part-7-d2b0bec1-7be5-4b51-8d53-3642680bc9c2.snappy.parquet at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:102) at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:102) at org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:102) at org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:97) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:277) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:135) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:101) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:101) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:214) ... 21 more {noformat} According to this stack trace, it seems that {{GroupColumnIO.getFirst()}} [doesn't check for empty
[jira] [Created] (PARQUET-893) GroupColumnIO.getFirst() doesn't check for empty groups
Cheng Lian created PARQUET-893: -- Summary: GroupColumnIO.getFirst() doesn't check for empty groups Key: PARQUET-893 URL: https://issues.apache.org/jira/browse/PARQUET-893 Project: Parquet Issue Type: Bug Components: parquet-mr Affects Versions: 1.8.1 Reporter: Cheng Lian The following Spark 2.1 snippet reproduces this issue: {code} import org.apache.spark.sql.types._ val path = "/tmp/parquet-test" case class Inner(f00: Int) case class Outer(f0: Inner, f1: Int) val df = Seq(Outer(Inner(1), 1)).toDF() df.printSchema() // root // |-- f0: struct (nullable = true) // ||-- f00: integer (nullable = false) // |-- f1: integer (nullable = false) df.write.mode("overwrite").parquet(path) val requestedSchema = new StructType(). add("f0", new StructType(). // This nested field name differs from the original one add("f01", IntegerType)). add("f1", IntegerType) println(requestedSchema.treeString) // root // |-- f0: struct (nullable = true) // ||-- f01: integer (nullable = true) // |-- f1: integer (nullable = true) spark.read.schema(requestedSchema).parquet(path).show() {code} In the above snippet, {{requestedSchema}} is compatible with the schema of the written Parquet file, but the following exception is thrown: {noformat} org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/parquet-test/part-7-d2b0bec1-7be5-4b51-8d53-3642680bc9c2.snappy.parquet at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:102) at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:102) at org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:102) at org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:97) at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:277) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:135) at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:101) at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154) at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:101) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:214) ... 21 more {noformat} According to this stack trace, it seems that {{GroupColumnIO.getFirst()}} [doesn't check for
[jira] [Updated] (SPARK-19529) TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
[ https://issues.apache.org/jira/browse/SPARK-19529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19529: --- Target Version/s: 1.6.3, 2.0.3, 2.1.1, 2.2.0 (was: 2.0.3, 2.1.1, 2.2.0) > TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() > --- > > Key: SPARK-19529 > URL: https://issues.apache.org/jira/browse/SPARK-19529 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 1.6.0, 2.0.0, 2.1.0 >Reporter: Josh Rosen >Assignee: Josh Rosen > > In Spark's Netty RPC layer, TransportClientFactory.createClient() calls > awaitUninterruptibly() on a Netty future while waiting for a connection to be > established. This creates problem when a Spark task is interrupted while > blocking in this call (which can happen in the event of a slow connection > which will eventually time out). This has bad impacts on task cancellation > when interruptOnCancel = true. > As an example of the impact of this problem, I experienced significant > numbers of uncancellable "zombie tasks" on a production cluster where several > tasks were blocked trying to connect to a dead shuffle server and then > continued running as zombies after I cancelled the associated Spark stage. > The zombie tasks ran for several minutes with the following stack: > {code} > java.lang.Object.wait(Native Method) > java.lang.Object.wait(Object.java:460) > io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) > io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) > > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) > > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) > => holding Monitor(java.lang.Object@1849476028}) > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) > > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) > > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) > > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: > 350) > org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) > > org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120) > > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) > > org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > [...] > {code} > I believe that we can easily fix this by using the > InterruptedException-throwing await() instead. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19529) TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()
[ https://issues.apache.org/jira/browse/SPARK-19529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-19529: --- Target Version/s: 2.0.3, 2.1.1, 2.2.0 (was: 2.0.3, 2.1.1) > TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() > --- > > Key: SPARK-19529 > URL: https://issues.apache.org/jira/browse/SPARK-19529 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core >Affects Versions: 1.6.0, 2.0.0, 2.1.0 >Reporter: Josh Rosen >Assignee: Josh Rosen > > In Spark's Netty RPC layer, TransportClientFactory.createClient() calls > awaitUninterruptibly() on a Netty future while waiting for a connection to be > established. This creates problem when a Spark task is interrupted while > blocking in this call (which can happen in the event of a slow connection > which will eventually time out). This has bad impacts on task cancellation > when interruptOnCancel = true. > As an example of the impact of this problem, I experienced significant > numbers of uncancellable "zombie tasks" on a production cluster where several > tasks were blocked trying to connect to a dead shuffle server and then > continued running as zombies after I cancelled the associated Spark stage. > The zombie tasks ran for several minutes with the following stack: > {code} > java.lang.Object.wait(Native Method) > java.lang.Object.wait(Object.java:460) > io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) > io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) > > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) > > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) > => holding Monitor(java.lang.Object@1849476028}) > org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) > > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > > org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) > > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) > > org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: > 350) > org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) > > org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120) > > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) > > org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) > > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > [...] > {code} > I believe that we can easily fix this by using the > InterruptedException-throwing await() instead. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18717) Datasets - crash (compile exception) when mapping to immutable scala map
[ https://issues.apache.org/jira/browse/SPARK-18717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-18717: --- Fix Version/s: 2.1.1 > Datasets - crash (compile exception) when mapping to immutable scala map > > > Key: SPARK-18717 > URL: https://issues.apache.org/jira/browse/SPARK-18717 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.2, 2.1.0 >Reporter: Damian Momot >Assignee: Andrew Ray > Fix For: 2.1.1, 2.2.0 > > > {code} > val spark: SparkSession = ??? > case class Test(id: String, map_test: Map[Long, String]) > spark.sql("CREATE TABLE xyz.map_test (id string, map_test map) > STORED AS PARQUET") > spark.sql("SELECT * FROM xyz.map_test").as[Test].map(t => t).collect() > {code} > {code} > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 307, Column 108: No applicable constructor/method found for actual parameters > "java.lang.String, scala.collection.Map"; candidates are: > "$line14.$read$$iw$$iw$Test(java.lang.String, scala.collection.immutable.Map)" > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18717) Datasets - crash (compile exception) when mapping to immutable scala map
[ https://issues.apache.org/jira/browse/SPARK-18717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-18717: --- Affects Version/s: 2.1.0 > Datasets - crash (compile exception) when mapping to immutable scala map > > > Key: SPARK-18717 > URL: https://issues.apache.org/jira/browse/SPARK-18717 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.2, 2.1.0 >Reporter: Damian Momot >Assignee: Andrew Ray > Fix For: 2.1.1, 2.2.0 > > > {code} > val spark: SparkSession = ??? > case class Test(id: String, map_test: Map[Long, String]) > spark.sql("CREATE TABLE xyz.map_test (id string, map_test map) > STORED AS PARQUET") > spark.sql("SELECT * FROM xyz.map_test").as[Test].map(t => t).collect() > {code} > {code} > org.codehaus.commons.compiler.CompileException: File 'generated.java', Line > 307, Column 108: No applicable constructor/method found for actual parameters > "java.lang.String, scala.collection.Map"; candidates are: > "$line14.$read$$iw$$iw$Test(java.lang.String, scala.collection.immutable.Map)" > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17714) ClassCircularityError is thrown when using org.apache.spark.util.Utils.classForName
[ https://issues.apache.org/jira/browse/SPARK-17714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856638#comment-15856638 ] Cheng Lian commented on SPARK-17714: Although I've no idea why this error occurs, it seems that setting system property {{io.netty.noJavassist}} to {{true}} can workaround this issue by disabling Javassist usage in Netty. > ClassCircularityError is thrown when using > org.apache.spark.util.Utils.classForName > > > Key: SPARK-17714 > URL: https://issues.apache.org/jira/browse/SPARK-17714 > Project: Spark > Issue Type: Bug >Reporter: Weiqing Yang > > This jira is a follow up to [SPARK-15857| > https://issues.apache.org/jira/browse/SPARK-15857] . > Task invokes CallerContext. SetCurrentContext() to set its callerContext to > HDFS. In SetCurrentContext(), it tries looking for class > {{org.apache.hadoop.ipc.CallerContext}} by using > {{org.apache.spark.util.Utils.classForName}}. This causes > ClassCircularityError to be thrown when running ReplSuite in master Maven > builds (The same tests pass in the SBT build). A hotfix > [SPARK-17710|https://issues.apache.org/jira/browse/SPARK-17710] has been made > by using Class.forName instead, but it needs further investigation. > Error: > https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.3/2000/testReport/junit/org.apache.spark.repl/ReplSuite/simple_foreach_with_accumulator/ > {code} > scala> accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, > name: None, value: 0) > scala> org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 0.0 (TID 0, localhost): java.lang.ClassCircularityError: > io/netty/util/internal/_matchers_/org/apache/spark/network/protocol/MessageMatcher > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > io.netty.util.internal.JavassistTypeParameterMatcherGenerator.generate(JavassistTypeParameterMatcherGenerator.java:62) > at > io.netty.util.internal.JavassistTypeParameterMatcherGenerator.generate(JavassistTypeParameterMatcherGenerator.java:54) > at > io.netty.util.internal.TypeParameterMatcher.get(TypeParameterMatcher.java:42) > at > io.netty.util.internal.TypeParameterMatcher.find(TypeParameterMatcher.java:78) > at > io.netty.handler.codec.MessageToMessageEncoder.(MessageToMessageEncoder.java:59) > at > org.apache.spark.network.protocol.MessageEncoder.(MessageEncoder.java:34) > at org.apache.spark.network.TransportContext.(TransportContext.java:78) > at > org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:354) > at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324) > at > org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90) > at > org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57) > at > org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57) > at > org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:162) > at > org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:80) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > io.netty.util.internal.JavassistTypeParameterMatcherGenerator.generate(JavassistTypeParameterMatcherGenerator.java:62) > at > io.netty.util.internal.JavassistTypeParameterMatcherGenerator.generate(JavassistTypeParameterMatcherGenerator.java:54) > at > io.netty.util.internal.TypeParameterMatcher.get(TypeParameterMatcher.java:42) > at > io.netty.util.internal.TypeParameterMatcher.find(TypeParameterMatcher.java:78) > at > io.netty.handler.codec.MessageToMessageEncoder.(MessageToMessageEncoder.java:59) > at > org.apache.spark.network.protocol.MessageEncoder.(MessageEncoder.java:34) > at org.apache.spark.network.TransportContext.(TransportContext.java:78) > at > org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:354) > at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324) > at > org.apache.spark.repl.ExecutorClassLoader.org$apache$spark$repl$ExecutorClassLoader$$getClassFileInputStreamFromSparkRPC(ExecutorClassLoader.scala:90) > at > org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57) > at > org.apache.spark.repl.ExecutorClassLoader$$anonfun$1.apply(ExecutorClassLoader.scala:57) > at >
[jira] [Resolved] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved SPARK-18539. Resolution: Fixed Assignee: Dongjoon Hyun Target Version/s: 2.2.0 > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Assignee: Dongjoon Hyun >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at
[jira] [Commented] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15851965#comment-15851965 ] Cheng Lian commented on SPARK-18539: SPARK-19409 upgrades parquet-mr to 1.8.2 and fixed this issue. > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at
[jira] [Commented] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840186#comment-15840186 ] Cheng Lian commented on SPARK-18539: [~viirya], sorry for the (super) late reply. What I mentioned was a *nullable* column instead of a *null* column. To be more specific, say we have two Parquet files: - File {{A}} has columns {{}} - File {{B}} has columns {{}}, where {{c}} is marked as nullable (or {{optional}} in the term of Parquet) Then it should be fine to treat these two files as a single dataset with a merged schema {{}} and you should be able to push down predicates involving {{c}}. BTW, the Parquet community just made a patch release 1.8.2 that includes a fix for PARQUET-389 and we probably will upgrade to 1.8.2 in 2.2.0. Then we'll have a proper fix for this issue and remove the workaround we did while doing schema merging. > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at >
[jira] [Commented] (HIVE-11611) A bad performance regression issue with Parquet happens if Hive does not select any columns
[ https://issues.apache.org/jira/browse/HIVE-11611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15813452#comment-15813452 ] Cheng Lian commented on HIVE-11611: --- While trying to fix a similar issue without upgrading Parquet in Spark, [~rdblue] brought up a workaround by creating a {{MessageType}} with a dummy field and then call {{MessageType.getFields.clear()}} to obtain an empty {{MessageType}}. See [here|https://github.com/apache/spark/blob/faabe69cc081145f43f9c68db1a7a8c5c39684fb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L549-L563]. > A bad performance regression issue with Parquet happens if Hive does not > select any columns > --- > > Key: HIVE-11611 > URL: https://issues.apache.org/jira/browse/HIVE-11611 > Project: Hive > Issue Type: Sub-task >Affects Versions: 2.0.0 >Reporter: Sergio Peña >Assignee: Ferdinand Xu > Attachments: HIVE-11611.patch > > > A possible performance issue may happen with the below code when using a > query like this {{SELECT count(1) FROM parquetTable}}. > {code} > if (!ColumnProjectionUtils.isReadAllColumns(configuration) && > !indexColumnsWanted.isEmpty()) { > MessageType requestedSchemaByUser = > getSchemaByIndex(tableSchema, columnNamesList, > indexColumnsWanted); > return new ReadContext(requestedSchemaByUser, contextMetadata); > } else { > return new ReadContext(tableSchema, contextMetadata); > } > {code} > If there are not columns nor indexes selected, then the above code will read > the full schema from Parquet even if Hive does not do anything with such > values. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (SPARK-19016) Document scalable partition handling feature in the programming guide
[ https://issues.apache.org/jira/browse/SPARK-19016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved SPARK-19016. Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 Issue resolved by pull request 16424 [https://github.com/apache/spark/pull/16424] > Document scalable partition handling feature in the programming guide > - > > Key: SPARK-19016 > URL: https://issues.apache.org/jira/browse/SPARK-19016 > Project: Spark > Issue Type: Bug > Components: Documentation >Affects Versions: 2.1.0, 2.2.0 >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Minor > Fix For: 2.1.1, 2.2.0 > > > Currently, we only mention this in the migration guide. Should also document > it in the programming guide. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19016) Document scalable partition handling feature in the programming guide
Cheng Lian created SPARK-19016: -- Summary: Document scalable partition handling feature in the programming guide Key: SPARK-19016 URL: https://issues.apache.org/jira/browse/SPARK-19016 Project: Spark Issue Type: Bug Components: Documentation Affects Versions: 2.1.0, 2.2.0 Reporter: Cheng Lian Assignee: Cheng Lian Priority: Minor Currently, we only mention this in the migration guide. Should also document it in the programming guide. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18956) Python API should reuse existing SparkSession while creating new SQLContext instances
Cheng Lian created SPARK-18956: -- Summary: Python API should reuse existing SparkSession while creating new SQLContext instances Key: SPARK-18956 URL: https://issues.apache.org/jira/browse/SPARK-18956 Project: Spark Issue Type: Bug Reporter: Cheng Lian We did this for the Scala API for Spark 2.0 but didn't update the Python API respectively. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18950) Report conflicting fields when merging two StructTypes.
[ https://issues.apache.org/jira/browse/SPARK-18950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-18950: --- Labels: starter (was: ) > Report conflicting fields when merging two StructTypes. > --- > > Key: SPARK-18950 > URL: https://issues.apache.org/jira/browse/SPARK-18950 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Lian >Priority: Minor > Labels: starter > > Currently, {{StructType.merge()}} only reports data types of conflicting > fields when merging two incompatible schemas. It would be nice to also report > the field names for easier debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18950) Report conflicting fields when merging two StructTypes.
Cheng Lian created SPARK-18950: -- Summary: Report conflicting fields when merging two StructTypes. Key: SPARK-18950 URL: https://issues.apache.org/jira/browse/SPARK-18950 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Lian Priority: Minor Currently, {{StructType.merge()}} only reports data types of conflicting fields when merging two incompatible schemas. It would be nice to also report the field names for easier debugging. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18753) Inconsistent behavior after writing to parquet files
[ https://issues.apache.org/jira/browse/SPARK-18753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-18753: --- Fix Version/s: 2.2.0 > Inconsistent behavior after writing to parquet files > > > Key: SPARK-18753 > URL: https://issues.apache.org/jira/browse/SPARK-18753 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu > Fix For: 2.1.0, 2.2.0 > > > Found an inconsistent behavior when using parquet. > {code} > scala> val ds = Seq[java.lang.Boolean](new java.lang.Boolean(true), null: > java.lang.Boolean, new java.lang.Boolean(false)).toDS > ds: org.apache.spark.sql.Dataset[Boolean] = [value: boolean] > scala> ds.filter('value === "true").show > +-+ > |value| > +-+ > +-+ > {code} > In the above example, `ds.filter('value === "true")` returns nothing as > "true" will be converted to null and the filter expression will be always > null, so it drops all rows. > However, if I store `ds` to a parquet file and read it back, `filter('value > === "true")` will return non null values. > {code} > scala> ds.write.parquet("testfile") > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > scala> val ds2 = spark.read.parquet("testfile") > ds2: org.apache.spark.sql.DataFrame = [value: boolean] > scala> ds2.filter('value === "true").show > +-+ > |value| > +-+ > | true| > |false| > +-+ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18753) Inconsistent behavior after writing to parquet files
[ https://issues.apache.org/jira/browse/SPARK-18753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-18753: --- Assignee: Hyukjin Kwon > Inconsistent behavior after writing to parquet files > > > Key: SPARK-18753 > URL: https://issues.apache.org/jira/browse/SPARK-18753 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Hyukjin Kwon > Fix For: 2.1.0, 2.2.0 > > > Found an inconsistent behavior when using parquet. > {code} > scala> val ds = Seq[java.lang.Boolean](new java.lang.Boolean(true), null: > java.lang.Boolean, new java.lang.Boolean(false)).toDS > ds: org.apache.spark.sql.Dataset[Boolean] = [value: boolean] > scala> ds.filter('value === "true").show > +-+ > |value| > +-+ > +-+ > {code} > In the above example, `ds.filter('value === "true")` returns nothing as > "true" will be converted to null and the filter expression will be always > null, so it drops all rows. > However, if I store `ds` to a parquet file and read it back, `filter('value > === "true")` will return non null values. > {code} > scala> ds.write.parquet("testfile") > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > scala> val ds2 = spark.read.parquet("testfile") > ds2: org.apache.spark.sql.DataFrame = [value: boolean] > scala> ds2.filter('value === "true").show > +-+ > |value| > +-+ > | true| > |false| > +-+ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-18753) Inconsistent behavior after writing to parquet files
[ https://issues.apache.org/jira/browse/SPARK-18753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian resolved SPARK-18753. Resolution: Fixed Fix Version/s: 2.1.0 Issue resolved by pull request 16184 [https://github.com/apache/spark/pull/16184] > Inconsistent behavior after writing to parquet files > > > Key: SPARK-18753 > URL: https://issues.apache.org/jira/browse/SPARK-18753 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu > Fix For: 2.1.0 > > > Found an inconsistent behavior when using parquet. > {code} > scala> val ds = Seq[java.lang.Boolean](new java.lang.Boolean(true), null: > java.lang.Boolean, new java.lang.Boolean(false)).toDS > ds: org.apache.spark.sql.Dataset[Boolean] = [value: boolean] > scala> ds.filter('value === "true").show > +-+ > |value| > +-+ > +-+ > {code} > In the above example, `ds.filter('value === "true")` returns nothing as > "true" will be converted to null and the filter expression will be always > null, so it drops all rows. > However, if I store `ds` to a parquet file and read it back, `filter('value > === "true")` will return non null values. > {code} > scala> ds.write.parquet("testfile") > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > scala> val ds2 = spark.read.parquet("testfile") > ds2: org.apache.spark.sql.DataFrame = [value: boolean] > scala> ds2.filter('value === "true").show > +-+ > |value| > +-+ > | true| > |false| > +-+ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18712) keep the order of sql expression and support short circuit
[ https://issues.apache.org/jira/browse/SPARK-18712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724381#comment-15724381 ] Cheng Lian edited comment on SPARK-18712 at 12/6/16 5:10 AM: - I think the contract here is that for a DataFrame {{df}} and 1 or more consecutive filter predicates applied to {{df}}, each filter predicate must be a full function over the output of {{df}}. Only in this way, we can ensure that the execution order of all the filter predicates can be irrelevant. This contract is important for optimizations like filter push-down. If we have to preserve execution order of all filter predicates, you won't be able to push down {{a}} in {{a AND b}}, and lose lots of optimization opportunities. In the case of the snippet in the JIRA description, the first predicate is a full function while the second is a partial function of the output of the original {{df}}. was (Author: lian cheng): I think the contract here is that for a DataFrame {{df}} and 1 or more consecutive filter predicates applied to {{df}}, each filter predicate must be a full function over the output of {{df}}. Only in this way, we can ensure that the execution order of all the filter predicates can be irrelevant. This contract is important for optimizations like filter push-down. If we have to preserve execution order of all filter predicates, you won't be able to push down {{a}} in {{a AND b}}, and lose lots of optimization opportunities. > keep the order of sql expression and support short circuit > -- > > Key: SPARK-18712 > URL: https://issues.apache.org/jira/browse/SPARK-18712 > Project: Spark > Issue Type: Wish > Components: SQL >Affects Versions: 2.0.2 > Environment: Ubuntu 16.04 >Reporter: yahsuan, chang > > The following python code fails with spark 2.0.2, but works with spark 1.5.2 > {code} > # a.py > import pyspark > import pyspark.sql.functions as F > import pyspark.sql.types as T > sc = pyspark.SparkContext() > sqlc = pyspark.SQLContext(sc) > table = {5: True, 6: False} > df = sqlc.range(10) > df = df.where(df['id'].isin(5, 6)) > f = F.udf(lambda x: table[x], T.BooleanType()) > df = df.where(f(df['id'])) > # df.explain(True) > print(df.count()) > {code} > here is the exception > {code} > KeyError: 0 > {code} > I guess the problem is about the order of sql expression. > the following are the explain of two spark version > {code} > # explain of spark 2.0.2 > == Parsed Logical Plan == > Filter (id#0L) > +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint)) >+- Range (0, 10, step=1, splits=Some(4)) > == Analyzed Logical Plan == > id: bigint > Filter (id#0L) > +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint)) >+- Range (0, 10, step=1, splits=Some(4)) > == Optimized Logical Plan == > Filter (id#0L IN (5,6) && (id#0L)) > +- Range (0, 10, step=1, splits=Some(4)) > == Physical Plan == > *Project [id#0L] > +- *Filter (id#0L IN (5,6) && pythonUDF0#5) >+- BatchEvalPython [(id#0L)], [id#0L, pythonUDF0#5] > +- *Range (0, 10, step=1, splits=Some(4)) > {code} > {code} > # explain of spark 1.5.2 > == Parsed Logical Plan == > 'Project [*,PythonUDF#(id#0L) AS sad#1] > Filter id#0L IN (cast(5 as bigint),cast(6 as bigint)) > LogicalRDD [id#0L], MapPartitionsRDD[3] at range at > NativeMethodAccessorImpl.java:-2 > == Analyzed Logical Plan == > id: bigint, sad: int > Project [id#0L,sad#1] > Project [id#0L,pythonUDF#2 AS sad#1] > EvaluatePython PythonUDF#(id#0L), pythonUDF#2 >Filter id#0L IN (cast(5 as bigint),cast(6 as bigint)) > LogicalRDD [id#0L], MapPartitionsRDD[3] at range at > NativeMethodAccessorImpl.java:-2 > == Optimized Logical Plan == > Project [id#0L,pythonUDF#2 AS sad#1] > EvaluatePython PythonUDF#(id#0L), pythonUDF#2 > Filter id#0L IN (5,6) >LogicalRDD [id#0L], MapPartitionsRDD[3] at range at > NativeMethodAccessorImpl.java:-2 > == Physical Plan == > TungstenProject [id#0L,pythonUDF#2 AS sad#1] > !BatchPythonEvaluation PythonUDF#(id#0L), [id#0L,pythonUDF#2] > Filter id#0L IN (5,6) >Scan PhysicalRDD[id#0L] > Code Generation: true > {code} > Also, I am not sure if the sql expression support short circuit evaluation, > so I do the following experiment > {code} > import pyspark > import pyspark.sql.functions as F > import pyspark.sql.types as T > sc = pyspark.SparkContext() > sqlc = pyspark.SQLContext(sc) > def f(x): > print('in f') > return True > f_udf = F.udf(f, T.BooleanType()) > df = sqlc.createDataFrame([(1, 2)], schema=['a', 'b']) > df = df.where(f_udf('a') | f_udf('b')) > df.show() > {code} > and I got the following output for both spark 1.5.2 and spark 2.0.2 > {code} > in f > in f > +---+---+ > | a| b| > +---+---+ > | 1| 2| > +---+---+ > {code} > there is only one
[jira] [Commented] (SPARK-18712) keep the order of sql expression and support short circuit
[ https://issues.apache.org/jira/browse/SPARK-18712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724381#comment-15724381 ] Cheng Lian commented on SPARK-18712: I think the contract here is that for a DataFrame {{df}} and 1 or more consecutive filter predicates applied to {{df}}, each filter predicate must be a full function over the output of {{df}}. Only in this way, we can ensure that the execution order of all the filter predicates can be irrelevant. This contract is important for optimizations like filter push-down. If we have to preserve execution order of all filter predicates, you won't be able to push down {{a}} in {{a AND b}}, and lose lots of optimization opportunities. > keep the order of sql expression and support short circuit > -- > > Key: SPARK-18712 > URL: https://issues.apache.org/jira/browse/SPARK-18712 > Project: Spark > Issue Type: Wish > Components: SQL >Affects Versions: 2.0.2 > Environment: Ubuntu 16.04 >Reporter: yahsuan, chang > > The following python code fails with spark 2.0.2, but works with spark 1.5.2 > {code} > # a.py > import pyspark > import pyspark.sql.functions as F > import pyspark.sql.types as T > sc = pyspark.SparkContext() > sqlc = pyspark.SQLContext(sc) > table = {5: True, 6: False} > df = sqlc.range(10) > df = df.where(df['id'].isin(5, 6)) > f = F.udf(lambda x: table[x], T.BooleanType()) > df = df.where(f(df['id'])) > # df.explain(True) > print(df.count()) > {code} > here is the exception > {code} > KeyError: 0 > {code} > I guess the problem is about the order of sql expression. > the following are the explain of two spark version > {code} > # explain of spark 2.0.2 > == Parsed Logical Plan == > Filter (id#0L) > +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint)) >+- Range (0, 10, step=1, splits=Some(4)) > == Analyzed Logical Plan == > id: bigint > Filter (id#0L) > +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint)) >+- Range (0, 10, step=1, splits=Some(4)) > == Optimized Logical Plan == > Filter (id#0L IN (5,6) && (id#0L)) > +- Range (0, 10, step=1, splits=Some(4)) > == Physical Plan == > *Project [id#0L] > +- *Filter (id#0L IN (5,6) && pythonUDF0#5) >+- BatchEvalPython [(id#0L)], [id#0L, pythonUDF0#5] > +- *Range (0, 10, step=1, splits=Some(4)) > {code} > {code} > # explain of spark 1.5.2 > == Parsed Logical Plan == > 'Project [*,PythonUDF#(id#0L) AS sad#1] > Filter id#0L IN (cast(5 as bigint),cast(6 as bigint)) > LogicalRDD [id#0L], MapPartitionsRDD[3] at range at > NativeMethodAccessorImpl.java:-2 > == Analyzed Logical Plan == > id: bigint, sad: int > Project [id#0L,sad#1] > Project [id#0L,pythonUDF#2 AS sad#1] > EvaluatePython PythonUDF#(id#0L), pythonUDF#2 >Filter id#0L IN (cast(5 as bigint),cast(6 as bigint)) > LogicalRDD [id#0L], MapPartitionsRDD[3] at range at > NativeMethodAccessorImpl.java:-2 > == Optimized Logical Plan == > Project [id#0L,pythonUDF#2 AS sad#1] > EvaluatePython PythonUDF#(id#0L), pythonUDF#2 > Filter id#0L IN (5,6) >LogicalRDD [id#0L], MapPartitionsRDD[3] at range at > NativeMethodAccessorImpl.java:-2 > == Physical Plan == > TungstenProject [id#0L,pythonUDF#2 AS sad#1] > !BatchPythonEvaluation PythonUDF#(id#0L), [id#0L,pythonUDF#2] > Filter id#0L IN (5,6) >Scan PhysicalRDD[id#0L] > Code Generation: true > {code} > Also, I am not sure if the sql expression support short circuit evaluation, > so I do the following experiment > {code} > import pyspark > import pyspark.sql.functions as F > import pyspark.sql.types as T > sc = pyspark.SparkContext() > sqlc = pyspark.SQLContext(sc) > def f(x): > print('in f') > return True > f_udf = F.udf(f, T.BooleanType()) > df = sqlc.createDataFrame([(1, 2)], schema=['a', 'b']) > df = df.where(f_udf('a') | f_udf('b')) > df.show() > {code} > and I got the following output for both spark 1.5.2 and spark 2.0.2 > {code} > in f > in f > +---+---+ > | a| b| > +---+---+ > | 1| 2| > +---+---+ > {code} > there is only one element in dataframe df, but the function f has been called > twice, so I guess no short circuit. > the result seems to conflict with #SPARK-1461 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18539) Cannot filter by nonexisting column in parquet file
[ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15724013#comment-15724013 ] Cheng Lian commented on SPARK-18539: [~xwu0226], thanks for the new use case! [~viirya], I do think this is a valid use case as long as all the missing columns are nullable. The only reason that this use case doesn't work right now is PARQUET-389. I got some vague idea about a possible cleaner fix for this issue. Will post it later. > Cannot filter by nonexisting column in parquet file > --- > > Key: SPARK-18539 > URL: https://issues.apache.org/jira/browse/SPARK-18539 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.0.2 >Reporter: Vitaly Gerasimov >Priority: Critical > > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.types.DataTypes._ > import org.apache.spark.sql.types.{StructField, StructType} > val sc = SparkSession.builder().config(new > SparkConf().setMaster("local")).getOrCreate() > val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}""")) > sc.read > .schema(StructType(Seq(StructField("a", IntegerType > .json(jsonRDD) > .write > .parquet("/tmp/test") > sc.read > .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", > IntegerType, nullable = true > .load("/tmp/test") > .createOrReplaceTempView("table") > sc.sql("select b from table where b is not null").show() > {code} > returns: > {code} > 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalArgumentException: Column [b] was not found in schema! > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59) > at > org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194) > at > org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59) > at > org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40) > at > org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126) > at > org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46) > at > org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110) > at > org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) > at >
[jira] [Updated] (SPARK-18730) Ask the build script to link to Jenkins test report page instead of full console output page when posting to GitHub
[ https://issues.apache.org/jira/browse/SPARK-18730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Lian updated SPARK-18730: --- Priority: Minor (was: Major) > Ask the build script to link to Jenkins test report page instead of full > console output page when posting to GitHub > --- > > Key: SPARK-18730 > URL: https://issues.apache.org/jira/browse/SPARK-18730 > Project: Spark > Issue Type: Bug > Components: Build >Reporter: Cheng Lian >Assignee: Cheng Lian >Priority: Minor > > Currently, the full console output page of a Spark Jenkins PR build can be as > large as several megabytes. It takes a relatively long time to load and may > even freeze the browser for quite a while. > I'd suggest posting the test report page link to GitHub instead, which is way > more concise and is usually the first page I'd like to check when > investigating a Jenkins build failure. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org