[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile The last build was killed by SIGKILL. Can you start a new build, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile I've addressed many of your points in today's commits. Can you please take a look at what I've done so far? I'm still working on the PRs you requested. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/19410 Hi @szhem. Thanks for the kind reminder and thanks for your contribution. I'm sorry I did not respond sooner. I no longer work where I regularly used the checkpointing code with large graphs. And I don't have access to any similar graph to test with now. I'm somewhat hamstrung by that limitation. That being said, I'll do my best to help. With respect to the failure you're seeing, can you tell me what happens if you set your graph's storage level to `StorageLevel.MEMORY_AND_DISK` or `StorageLevel.MEMORY_AND_DISK_SER_2` without applying this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile I've removed the changes to the files as you requested. This removes support for schema pruning on filters of queries. I've pushed the previous revision to a new branch in our `spark-public` repo, [spark-4502-parquet_column_pruning-filtering](https://github.com/VideoAmp/spark-public/tree/spark-4502-parquet_column_pruning-filtering). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199648692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { -parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => -// Converted field value should be set to the `ordinal`-th cell of `currentRow` -newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) +parquetType.getFields.asScala.map { + case parquetField => +val fieldIndex = catalystType.fieldIndex(parquetField.getName) --- End diff -- I dropped into the `sql/console` and attempted to write a parquet file with duplicate column names. It didn't work. Transcript below. ``` scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> val sameColumnNames = StructType(StructField("a", IntegerType) :: StructField("a", StringType) :: Nil) sameColumnNames: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true), StructField(a,StringType,true)) scala> val rowRDD = sqlContext.sparkContext.parallelize(Row(1, "one") :: Row(2, "two") :: Nil, 1) rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at :51 scala> val df = sqlContext.createDataFrame(rowRDD, sameColumnNames) 18/07/02 16:31:33 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse'). 18/07/02 16:31:33 INFO SharedState: Warehouse path is 'file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse'. 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@7b13b737{/SQL,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@3c9fb104{/SQL/json,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@3d5cadbe{/SQL/execution,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@73732e26{/SQL/execution/json,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@72a13c4a{/static/sql,null,AVAILABLE,@Spark} 18/07/02 16:31:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint df: org.apache.spark.sql.DataFrame = [a: int, a: string] scala> df.write.parquet("sameColumnNames.parquet") org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into file:/Volumes/VideoAmpCS/msa/workspace/spark-public/sameColumnNames.parquet: `a`; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:42) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:64) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExe
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/19410 Hi @szhem. I dug deeper and think I understand the problem better. To state the obvious, the periodic checkpointer deletes checkpoint files of RDDs that are potentially still accessible. In fact, that's the problem here. It deletes the checkpoint files of an RDD that's later used. The algorithm being used to find checkpoint files that can be "safely" deleted is flawed, and this PR aims to fix that. I have a few thoughts from this. 1. Why does the periodic checkpointer delete checkpoint files? I absolutely understand the preciousness of cache memory and wanting to keep the cache as clean as possible, but this has nothing to do with that. We're talking about deleting files from disk storage. I'm making some assumptions, like we're using a filesystem that's not backed by RAM, but disk storage is dirt cheap these days. Why can't we just let the user delete the checkpoint files themselves? 1.a. Can we and should we support a mode where the automatic deletion of checkpoint files is an option (with a warning of potential failures)? To maintain backwards compatibility, we set this option to true by default, but "power" users can set this value to false to do the cleanup themselves and ensure the checkpointer doesn't delete files it shouldn't. 2. I think the JVM gives us a built-in solution for the automatic and safe deletion of checkpoint files, and the `ContextCleaner` does just that (and more). Can we leverage that functionality? What do you think? @felixcheung or @viirya, can you weigh in on this, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199631341 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -47,16 +47,25 @@ import org.apache.spark.sql.types._ * * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. + * + * @param parquetMrCompatibility support reading with parquet-mr or Spark's built-in Parquet reader */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], +parquetMrCompatibility: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ + /** + * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] and + * [[parquetMrCompatibility]] set to [[false]]. + * + * We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only + * used in the vectorized reader, where we get the convertTz value directly, and the value here + * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] as this constructor is only + * called by the Spark reader. --- End diff -- I don't understand your confusion. I think the comment makes it very clear why we need to set that parameter to false. How can I make it better? Or can you be more specific about what is unclear to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199643803 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- I believe the failure occurs because the requested schema and file schemaâwhile having columns with identical names and typesâhave columns in different order. Of the one test that fails in the `ParquetFilterSuite`, namely "Filter applied on merged Parquet schema with new column should work", it appears to be the only one for which the order of the columns is changed. These are the file and requested schema for that test: ``` Parquet file schema: message spark_schema { required int32 c; optional binary b (UTF8); } Parquet requested schema: message spark_schema { optional binary b (UTF8); required int32 c; } ``` I would say the Spark reader expects identical column order, whereas the parquet-mr reader accepts different column order but identical (or compatible) column names. That's my supposition at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16578 @DaimonPl I'm going to resolve the merge conflicts shortly. Otherwise, I have no intention of making further modifications to this PR outside of further review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16578 @viirya I've rebased to resolve conflicts. All tests are passing. Can you take another look and sign off? Cheers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16578 I'd just suggest trying it. Since this PR is a patch for master, please message me personally at m...@allman.ms to discuss progress and questions on a backport to 2.2. If we get it working, we can post back here with a link to a fork. Thanks for taking this on! Michael On Mon, 8 Jan 2018, Gaurav M Shah wrote: > > @mallman do you foresee any issues ? planning to backport it to spark 2.2 on > personal fork. will probably make jitpack release > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub, or mute the > thread.[AAy4nfrO2mfWXiVObJZERmlMm1J9RH0Qks5tIoHYgaJpZM4LjK0N.gif] > > > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16578 > However, I am -1 on merging a change this large after branch cut. It's disappointing, but I agree we can't merge a change this large into a branch cut. It will have to wait for 2.3.1 at the earliest or the next major release. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @mallman Yes, the issue with window functions is reproducible even with this PR. Can you attach a (small) parquet file I can use to test this scenario? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @gatorsmile @HyukjinKwon @ajacques I'm seeing incorrect results from the following simple projection query, given @jainaks test file: > >``` >select page.url, page from temptable >``` I believe I have identified and fixed this bug. I have added a commit with a fix and additional test cases that cover it and related failure scenarios. The underlying problem was that when merging the struct fields derived from the projections `page.url` and `page` into a single pruned schema, the merge function we are using does not necessarily respect the order of the fields in the schema of `temptable`. In the above and similar scenarios, the merge function merged `page.url` and `page` into a struct consisting of the `page.url` field followed by the other fields in `page`. While this produces a schema that has a subset of the fields of `temptable`'s schema, the fields are in the wrong order. I considered two high-level approaches to fixing this problem. The first was to rework the way the pruned schema was constructed in the first place. That is, rather than construct the pruned schema by merging fields together, construct the merged schema by directly filtering the original schema. I think that approach would go along the lines of altering the `SelectedField` extractor for an expression to a `SelectedStruct` extractor that extracts a whole struct from a sequence of expressions. The latter expressions would consist of the projection and filtering expressions of a `PhysicalOperation`. I did not go further in exploring that route, as it would involve a substantial rewrite of the patch. However, in the end it may be the cleaner, more natural route. The second approach I consideredâand adoptedâwas to sort the fields of the merged schema, recursively, so that the order of the fields in the merged schema respected the order of their namesakes in the original schema. This adds more complexity to the patch, undesirable for something already so complex. But it appeared to be the quickest route to a correct solution. Given more time, I would probably explore rewriting the patch with a `SelectedStruct` extractor as described above. I don't know it would actually lead to something less complex. It's just a thought. I added three additional test "scenarios" to `ParquetSchemaPruningSuite.scala` (each "scenario" is tested four ways by the `testSchemaPruning` function). They test three distinct scenarios that fail without the fix. These scenarios consist of a field and its parent struct, an array-based variation and a map-based variation. I added some additional array and map data to the test data to ensure proper test coverage. Incidentally, I also added an integer `id` field to the test contact types so that the results of queries on the contact tables can be ordered deterministically. This should have been part of the tests all along, but I forgot to incorporate it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > These test failures are in Spark streaming. Is this just an intermittent test failure or actually caused by this PR? I was able to run the first failing test successfully. Can we get a retest, please? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > I was able to run the first failing test successfully. Can we get a retest, please? @ajacques I just rebased and pushed my branch off of master. Perhaps the easiest thing to do would be for you to rebase again off of my branch and do another force-push to your branch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Anybody else able to reproduce this failure? It succeeded on my developer machine. It worked for me, too. Let's see what a retest does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Oh dear. I don't know why we're getting all these sigkills. I think we're going to need another retest... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Hi @gatorsmile. Where do you see us at this point? Do you still want to get this into Spark 2.4? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman, if we are all happy here, mind taking a look https://github.com/apache/spark/pull/21320#issuecomment-408271470 and https://github.com/apache/spark/pull/21320#issuecomment-406765851 I'm working on the former. I don't understand what kind of "second PR" @gatorsmile is referring to in the latter comment. I know we need to do something to fix the ignored tests, but those tests are in this PR. Am I supposed to create a PR on this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 Sorry it's taken me a couple of days to respond. I needed the time to ruminate (and not). I could write voluminously, but I just want to reply to a couple of points and move on. > I think that's primarily because the change looks incomplete but the feature itself sounds good to have. I think that's why people try to take a look a lot. It's very incomplete, yes, in comparison to where it started in #16578. That PR supported pruning in projections, filters, joins and aggregations, and paved the way for further optimizationsâe.g. window support. This specific PR started with projections and filters, then removed support for filters, then removed code that ensured it wouldn't fail under certain scenarios (r.e. the now ignored tests). I will grant that the latter code definitely falls under the "gee this seems to fix it" kind of workaround than an actually correct fix, and I'm not against pursuing the latter. I am uneasy about merging in broken code. I have complied with @gatorsmile's requests to remove changes from this PR. Sometimes in the process I have accidentally left some dangling dead code or what appears to be some bad design decisions, such as in https://github.com/apache/spark/pull/21320#issuecomment-407713622. In all of those cases, this is code that makes more sense in the broader context of #16578. I could have just kept quiet and complied, or defended myself, but I didn't have patience for the former nor energy for the latter. At this point, after about a week of time I am in a better mood to collaborate. > Also honestly this logic looks convoluted. I need to point out that I don't know how to respond to comments like these. I have put my best effort forward. If you want me to change something, I need more specific guidance. What do you want me to do about this? Don't just point out a problem, offer a solution or at least a suggestion. > @ajacques, if you are willing to take over this, please go ahead. I would appreciate it. This is probably the best way to get through this PR. I will try to participate and help. Please post a link to your PR when you open it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Thanks @jainaks for the sample file and instructions to reproduce the problem. I will investigate and reply. @gatorsmile @HyukjinKwon @ajacques I'm seeing incorrect results from the following simple projection query, given @jainaks test file: ``` select page.url, page from temptable ``` This is worrisome to me. The failure scenario can probably be simplified further, and that will be part of my investigation. However, I've just started digging deep into this, and I have company coming over for the rest of the night in about half an hour. I probably won't have a solid lead on or solution to this bug until tomorrow. Does anyone else want to work on it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 Thanks @jainaks for the sample file and instructions to reproduce the problem. I will investigate and reply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Few comments like #21320 (comment) or ^ are not minor or nits. I leave hard -1 if they are not addressed. I'm sorry to say I'm very close to hanging up this PR. I put a lot of care, time and effort into my work. After more than two years of off and on review, discussion/debate, nitpicking, commits, steps forward and backwards, to have someone swoop in at this time with a new raft of nitpicking and stylistic issues that set the review back again further is beyond maddening. And that's why you have not received much cooperation from me. Perhaps I should have stated that up front, but I've lost patience defending myself. Contributing to this PR is a tax on what is completely voluntary, unpaid time. I have no professional responsibility to this effort. Maybe it's better off done by someone who does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Where does that leave both of these PRs? Do we still want this one with the code refactoring or to go back to the original? Are there any comments for this PR that would block merging? I've set the default to false in this PR. @ajacques Do you want to rebase your PR off my last commit? Then I think it's up to @HyukjinKwon and @gatorsmile on how to proceed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Success! Now where do we stand, @gatorsmile @HyukjinKwon ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Are there any other blockers to enabling this by default now that @mallman fixed the currently known broken queries? The functionality exercised by the ignored test in `ParquetSchemaPruningSuite.scala` is still broken. That's something we're hoping to fix in a follow on PR. This PR has to be merged first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > The tests as committed pass for me, but I removed the order by id and I got that error. Are you saying it works with the specific query in my comment? Oh! I didn't notice you changed the query. Okay. I'll take a closer look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman: I've rebased on top of your changes and pushed. I'm seeing the following: That test passes for me locally. Also, I inspected your branch and could not find any errors in the rebase. What commit hash are you testing locally? I'm using `92901da3785ce94db501a4c3d9be6316cfbf29a9`. Please ensure we're on the same commit. If so, try doing an `sbt clean` and running your test again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r207718734 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + val contacts = +Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: +Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + val briefContacts = +BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map(), +p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { +val query = sql("select name.middle, name from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { +val query = sql("select friends.middle, friends from contacts where p=1 order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, + Row(Array("Z."), Array(Row(&
[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r207719862 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + val contacts = +Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: +Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + val briefContacts = +BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map(), +p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { +val query = sql("select name.middle, name from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { +val query = sql("select friends.middle, friends from contacts where p=1 order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, + Row(Array("Z."), Array(Row(&
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman: I've rebased on top of your changes and pushed. I'm seeing the following That's the test case that I "unignored". It was passing. There must be some simple explanation. I will look into it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Test build #94228 has finished for PR 21889 at commit 92901da. The test failure appears to be unrelated to this PR. Is it just me or has the test suite become flakier in the past few months? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > The tests as committed pass for me, but I removed the order by id and I got that error. Are you saying it works with the specific query in my comment? @ajacques Please try this query: ``` select id, name.middle, address from temp ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 I've pushed a commit to restore the original test coverage while also ensuring determinism of the output. Don't ask me how I did it. It's a secret! The test that was failing before it was kinda passing is now failing again so I marked it ignored so it wouldn't break Jenkins. And I reverted the commit that enabled this feature by default, because it's still broken. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > select id, name.middle, address from temp - Works > select name.middle, address from temp - Fails > select name.middle from temp - Works > select name.middle, id, address from temp - Works > select name.middle, address, id from temp - Works Removing the `order by` clause from your test query caused it to fail, but it has nothing to do with ordering. It appears that the failure in this case is manifested when the file scan schema is exactly the `name.middle` and `address` columns. Introducing the `order by` clauses in the test suite queries gave them necessary determinism for checking query answers, but these modifications also altered the file scan schema. I need to fix the tests, but I think that the failure underlying the previously ignored test case has not been resolved after all. It was just a case of confusing coincidence. Unfortunately we're still not ready to merge this PR yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman Is it related to this revert in ParquetReadSupport.scala? I re-added this logic and all 32 tests in ParquetSchemaPruningSuite passed. Yes. That's what we need to work on in the next PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > This patch fails Scala style tests. Hi @ajacques. I'm not sure if you're aware of this, but you can run the scalastyle checks locally with ``` sbt scalastyle ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Alright to make sure we're all on the same page, it sounds like we're ready to merge this PR pending: > > * Successful build by Jenkins > * Any PR comments from a maintainer > > This feature will be merged in disabled state and can't be enabled until the next PR is merged, but we do not expect any regression in behavior in the default disabled state. I agree. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 >> @mallman, while we wait for the go-no-go, do you have the changes for the next PR ready? Is there anything you need help with? > I have the hack I used originally, but I haven't tried finding a better solution yet. It could take some time to understand the underlying problem/incompatibility/misunderstanding/etc. I spent some time yesterday digging deeper into why the hack I wrote worked, and I think I understand now. Practically speaking, my follow-on PR will be about the same as the commit I removed. However, I can support it with some explanatory comments instead of just "this throws an exception sometimes". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > @mallman, while we wait for the go-no-go, do you have the changes for the next PR ready? Is there anything you need help with? I have the hack I used originally, but I haven't tried finding a better solution yet. It could take some time to understand the underlying problem/incompatibility/misunderstanding/etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 @ajacques Please rebase off my branch. @gatorsmile I don't recall seeing that error before. Any idea for how I can reproduce and debug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Assuming from #21889 (comment), we shouldn't have any identified bug here. What kind of bugs left to be fixed? That bug was address by b50ddb4. We still need to fix the bug underlying the failing (ignored) test case. I have a tentative fix for that, but @gatorsmile wants to review it in a follow-on PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r208446828 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + val contacts = +Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: +Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + val briefContacts = +BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map(), +p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { +val query = sql("select name.middle, name from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { +val query = sql("select friends.middle, friends from contacts where p=1 order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, + Row(Array("Z."), Array(Row(&
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 See https://github.com/apache/spark/pull/21320#issuecomment-406353694 for @gatorsmile's request to move the changes to `ParquetReadSupport.scala` to another PR. There was another, unrelated bug reported by @jainaks and addressed in https://github.com/apache/spark/pull/21320#issuecomment-408588685. AFAIK, there's nothing outstanding blocking this PR from being merged as I stated in https://github.com/apache/spark/pull/21889#issuecomment-410557228. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > just for clarification, so now .. there no outstanding bugs, some tests are ignored per #21320 (comment) and left comments were mostly addressed. Did i understand correctly? The ignored testsâand the scenarios they are intended to testâwill fail with a runtime exception if this feature is enabled. I put forward a fix in `ParquetReadSupport.scala`, but @gatorsmile didn't want to address that in this PR. Otherwise, there are no known bugs with this patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 @ajacques I added a commit to enable schema pruning by default. It's a little more complete than your commit to do the same. Please rebase off my branch and remove your commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Then should we keep this one or #21889? shall we deduplicate the efforts? I requested to open that because this looks going to be inactive per your comments. As I stated before, I'll continue pushing changes to this branch. However, the window of opportunity to review syntax and style in this PR closed long ago. If someone wants to put forward that kind of comment for review I will consider it at my discretion. I'm not going to guarantee action or even a response. If someone relays a bug or a concern regarding correctness or performance, I will address it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman if you're planning on making more code changes, would you be willing to work on a shared branch or something? I've been working to incorporate the CR comments. No, however if you want to open a PR against the VideoAmp spark-4502-parquet_column_pruning-foundation branch I will review your changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 > Due to the urgency of the upcoming 2.4 code freeze, I'm going to open this PR to collect any feedback. This can be closed if you prefer to continue to the work in the original PR. That would be my preference, yes, especially if it means less work for you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 >> Hello, we've been using your patch at Stripe and we've found something that looks like a new bug: > > Thank you for sharing this, @xinxin-stripe. This is very helpful. I will investigate and report back. I have not been able to reproduce this issue with this branch at commit 0e5594b6ac1dcb94f3f0166e66a7d4e7eae3d00c. However, I'm seeing the same failure scenario as yours on VideoAmp's internal 2.1, 2.2 and 2.3 backports of this branch. I think the reason for this difference is that our internal branches (and probably yours) incorporate rules to support pruning for aggregations. That functionality was removed from this PR. I will fix this and share the fix with you. It would help if you could send me a scenario where you can reproduce this failure with a Spark SQL query. Query plans for datasets built from SQL queries tend to be much more readable. Consider e-mailing me directly on this issue because it does not appear to be strictly related to this PR. My e-mail address is m...@allman.ms. Thanks again! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Hello, we've been using your patch at Stripe and we've found something that looks like a new bug: Thank you for sharing this, @xinxin-stripe. This is very helpful. I will investigate and report back. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 >> the window of opportunity to review syntax and style in this PR closed long ago. > Why/when is this window closed? Who closed that? What I wrote above is a coarse approximation of my stance on the matter. It's inadequate, and I have struggled to adequately express myself. Reflecting on this last night I believe I was able to nail down exactly what I want to write, but I don't have time to write right now. I will reply in full later, within a day or two. I will also address your recent comments and questions. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > I see no point of leaving this PR open. I don't agree with you on that point, and I've expressed my view in https://github.com/apache/spark/pull/21889#issuecomment-413655304. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Essentially, this PR was created to take the management of #21320 out of my hands, with a view towards facilitating its incorporation into Spark 2.4. It was my suggestion, one based in frustration. In hindsight, I no longer believe this strategy is the bestâor most expedientâapproach towards progress. Indeed, I believe the direction of this PR has become orthogonal to its motivating goal, becoming a dispute between myself and @HyukjinKwon rather than a means to move things along. I believe I can shepherd #21320 in a way that will promote greater progress. @ajacques, I mean no disrespect, and I thank you for volunteering your time, patience and effort for the sake of all that are interested in seeing this patch become a part of Spark. And I apologize for letting you down, letting everyone down. In my conduct leading up to the creation of this PR I did not act with the greatest maturity or patience. And I did not act in the best interests of the community. No one has spent more time or more effort, taken more responsibility or exhibited more patience with this 2+ year patch-set-in-the-making than myself. I respectfully submit it is mine to present and manage, and no one else's. Insofar as I have expressed otherwise in the past, I admit my errorâone made in frustrationâand recant in hindsight. @ajacques, at this point I respectfully assert that managing the patch set I submitted in #21320 is not your responsibility, nor is it anyone else's but mine. I ask you to close this PR so that we can resume the review in #21320. As I stated there, you are welcome to open a PR on https://github.com/VideoAmp/spark-public/tree/spark-4502-parquet_column_pruning-foundation to submit the changes you've made for review. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman, can we close this PR? Are you willing to update here or not? I pushed an update less than a day ago, and I intend to continue pushing updates as needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21889: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21889 Are we waiting for @gatorsmile's go-ahead and merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Try this when spark.sql.nestedSchemaPruning.enabled is on? This is a case-sensitivity issue (obviously). I'll get to the root of it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Try this when spark.sql.nestedSchemaPruning.enabled is on? I don't think this will be difficult to fix. I'm working on it now and will add relevant test coverage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212388958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- I'll get back to you on this shortly. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212396370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- These changes are related to my fix for the ignored unit test. If I apply my fix but keep the master version of this file, 24 unit tests fail. If I apply my fix along with this file diff then all tests pass, including the test that is currently ignored. I'm not sure I can develop a unit test for this current commit that should pass but will fail without this file's changes. I haven't spent any time thinking about it, and I really need to work on other things right now. If you want I will back out this change. However, I will re-incorporate it in a follow-on PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman Could you remove the changes made in ParquetRowConverter.scala and also turn off spark.sql.nestedSchemaPruning.enabled by default in this PR? Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 Thanks everyone for your contributions, support and patience. It's been a journey and a half, and I'm excited for the future. I will open a follow-on PR to address the current known failure scenario (see ignored test) in this patch, and we can discuss if/how we can get it into 2.4 as well. I know there are many early adopters of this patch and #16578. Bug reports will continue to be very helpful. Beyond this patch, there are many possibilities for widening the scope of schema pruning. As part of our review process, we've pared the scope of this capability to just projection. IMHO, the first limitation we should address post 2.4 is supporting pruning with query filters of nested fields ("where" clauses). Joins, aggregations and window queries would be powerful enhancements as well, bringing the scope of schema pruning to analytic queries. I believe all of the additional features VideoAmp has implemented for schema pruning are independent of the underlying column store. Future enhancements should be automagically inherited by any column store that implements functionality analogous to `ParquetSchemaPruning.scala`. This should widen not just the audience that can be reached, but the developer community that can contribute and review. Thanks again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile Any concerns about merging this PR at this point? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Add some test cases when turning on spark.sql.caseSensitive? Will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile How does this look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > I think if the tests are few, you can make them ignored for now here, and make another PR enabling it back with the changes in ParquetReadSupport.scala. That's the approach I've taken in the last rebase I just pushed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > BTW, I am trying to take a look closely. I would appreciate if there are some concrete examples so that I (or other reviewers) can double check along. Parquet is pretty core fix and let's be very sure on each case. What kind of examples would help? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204208518 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // |||
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman I still think we need to split it to two PRs. To resolve the issues you mentioned above, how about creating a separate PR? Only 10 days left before the code freeze of Spark 2.4. We plan to merge the main logic of nested column pruning to Spark 2.4 release first and then address the other parts in the next release. WDYT? I've rebased and pushed a commit that removes the changes to `ParquetReadSupport.scala` and `ParquetFileFormat.scala`. I've also marked failing tests in `ParquetSchemaPruningSuite.scala` as ignored. Is this what you had in mind? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 The test failure is unrelated to this patch. Shall we retest? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204209033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1298,8 +1298,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + +"implements this optimization.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) --- End diff -- I'm against enabling this feature by default with a known failing test case. For example, https://github.com/apache/spark/pull/21320/files#diff-0c6c7481232e9637b91c179f1005426aR71. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Could we move the changes made in ParquetReadSupport.scala to a separate PR? Then, we can merge this PR very quickly. If I remove the changes to `ParquetReadSupport.scala`, then four tests fail in `ParquetSchemaPruningSuite.scala`. I don't think we should/can proceed without addressing the issue of reading from two parquet files with identical column names and types but different ordering of those columns in their respective file schema. Personally, I think the fact that the Spark parquet reader appears to assume the same column order in otherwise compatible schema across files is a bug. I think column selection should be by name, not index. The parquet-mr reader behaves that way. As a stop-gap alternative, I suppose we could disable the built-in reader if parquet schema pruning is turned on. But I think that would be a rather ugly, invasive and confusing hack. Of course I'm open to other ideas as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > @mallman I still think we need to split it to two PRs. To resolve the issues you mentioned above, how about creating a separate PR? Only 10 days left before the code freeze of Spark 2.4. We plan to merge the main logic of nested column pruning to Spark 2.4 release first and then address the other parts in the next release. WDYT? Sorry, I still don't understand what split you're suggesting. How do you want me to split it, and which do we continue with next? This one? The new one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204206072 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact(name: FullName, address: String, pets: Int, friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val contacts = +Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) :: +Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(name: Name, address: String) + + val briefContacts = +BriefContact(Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int, +friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), p: Int) + + case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(name: Name, address: String) => + BriefContactWithDataPartitionColumn(name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { +val query = sql("select name.middle, p from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + testSchemaPruning("partial schema intersection - select missing subfield") { +val query = sql("select name.middle, address from contacts where p=2") +checkScanSchemata(query, "struct,address:string>") +checkAnswer(query, + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { +val query = + sql("select name.last, name.middle, name.first, relatives[''].last, " + +"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + +"friends[0].first, pets, address from contacts where p=2") +// We've selected every field in the schema. Therefore, no schema pruning should be performed. +// We check this by asserting that the scanned schema of the query is identical to the schema +// of the contacts relation, even though the fields are selected in d
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205020970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. --- End diff -- This change is not part of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021140 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { --- End diff -- No comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- Okay. So... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021469 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021712 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit --- End diff -- No comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205022799 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact(name: FullName, address: String, pets: Int, friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val contacts = +Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) :: +Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(name: Name, address: String) + + val briefContacts = +BriefContact(Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int, +friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), p: Int) + + case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(name: Name, address: String) => + BriefContactWithDataPartitionColumn(name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { +val query = sql("select name.middle, p from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + ignore("partial schema intersection - select missing subfield") { +val query = sql("select name.middle, address from contacts where p=2") +checkScanSchemata(query, "struct,address:string>") +checkAnswer(query, + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { +val query = + sql("select name.last, name.middle, name.first, relatives[''].last, " + +"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + +"friends[0].first, pets, address from contacts where p=2") +// We've selected every field in the schema. Therefore, no schema pruning should be performed. +// We check this by asserting that the scanned schema of the query is identical to the schema +// of the contacts relation, even though the fields are selected in different order
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205022895 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { --- End diff -- No comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205022974 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // |||
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > Regarding #21320 (comment), can you at least set this enable by default and see if some existing tests are broken or not? I have no intention to at this point, no. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 > gentle ping @mallman since the code freeze is close Outside of my primary occupation, my top priority on this PR right now is investigating https://github.com/apache/spark/pull/21320#issuecomment-396498487. I don't think I'm going to get a test file from the OP, so I'm going to try to reproduce the issue on my own. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 >> Hi @jainaks. Thanks for your report. Do you have the same problem running your test with this PR? > @mallman Yes, the issue with window functions is reproducible even with this PR. Hi @jainaks. I want to circle back with you about this. Are you still having this trouble with the latest version of the PR? If so, can you please attach a small parquet file for testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r216037341 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- Hi @rezasafi I believe the reasoning is if the user has disabled direct sql, we will try to fetch the partitions for the requested partition predicate anyway. However, since we don't expect that call to succeed, we just log a warning and fallback to the legacy behavior. On the other hand, if the user has enabled direct sql, then we expect the call to Hive to succeed. If it fails, we consider that an error and throw an exception. I hope that helps clarify things. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 Hi @viirya, Thanks for this PR! I have an alternative implementation which I'd like to submit for comparison. My implementation was something I removed from my original patch. I hope to have my PR submitted sometime today. I have another PR to submit, too. I'll be sure to refer to your PR in mine. Cheers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 > @mallman It will be great that we can have this fix in 2.4 release as this can dramatically reduce the data being read in many applications which is the purpose of the original work. I agree it would be great to have this capability in 2.4. But I don't know that this PR is the right way to accomplish our intended goal. I'm also not sure this patch accomplishes its intended goal. And I would like time to complete my reviewâI'm still running tests against this patch. I would also like to submit my patch as an alternative for review, because the approach made by this PR and by my patch are not compatible. Even though it's incomplete, I'm willing to submit it as-is with some notes on how it's incomplete and what needs to be done. However, I can say for certain there is no way it would be accepted for Spark 2.4. The earliest I could get it submitted is tomorrow morning (EDT). However, to give you a sense of how my patch works, I can give you the gist of how I see the problem. Basically, constraint propagation as defined in `QueryPlanConstraints.scala` inhibits schema pruning. Indeed, if you turn off constraint propagation (by setting `spark.sql.constraintPropagation.enabled` to `false`), the following query select employer.id from contacts where employer.id = 0 produces the following physical plan ``` == Physical Plan == *(1) Project [employer#36.id AS id#47] +- *(1) Filter (employer#36.id = 0) +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct> ``` without applying _either_ patch. (FYI I ran this on the master branch, commit 12e3e9f17dca11a2cddf0fb99d72b4b97517fb56). The only column read in this plan is `employer.id`, just as we'd like. Aside from the difference in approach, I have some other concerns around this PR. I don't think we should push down `IsNotNull(employer)` to the reader unless we need to. This PR includes that pushed down filter for both of the sample queries I provided in my previous comment https://github.com/apache/spark/pull/22357#issuecomment-419612555. The question isâhow does that pushdown affect the reader's behavior? That leads me to a concern around the testing of this functionality. Our intent is to read from as few columns as necessary. In the query select employer.id from contacts where employer.id = 0 we need only read from the `employer.id` column. And we can tell the reader to only read that column. But how do we know that pushing down `IsNotNull(employer)` does not negate that instruction? One way to be certain is to not push that filter down in the first place. That is the approach my patch currently takes. Of course, this removes the pushdown. I think that identifying which plan leads to a faster scan requires a more robust testing capability, however one thing is certain: the `FileScan` in my patch's plan gives no reason to believe that it is reading anything other than that one column. IMO, we can get closer to settling the question of relative performance/behavior by pushing down Parquet reader filters just for the columns we need, e.g. `IsNotNull(employer.id)` in this case above. Neither patch (currently) does that, however I think my patch is closer to achieving that because it already identifies `isnotnull(employer#4445.id)` as a filter predicate in the query plan. We just need to push it down. As I mentioned, I'll endeavor to have my patch posted as a PR by tomorrow morning, but I can't make a promise of that. I'm sorry for the delay. I really wasn't expecting we'd work on this functionality for Spark 2.4. We do have a known bug in the schema pruning functionality that's in Spark 2.4âone that throws an error. We identified it in #21320 (look for the "ignored" test in `ParquetSchemaPruningSuite.scala`), but I don't think we have an issue in Jira for it. I'll try to take care of that by tomorrow morning as well, and I was hoping we would prioritize that. I have a patch for that bug that is code complete but missing proper code documentation. Thanks all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 > FYI, per further checking code and discussion with @dbtsai regarding with predicate pushdown, we know that predicate push down only works for primitive types on Parquet datasource. So both `IsNotNull(employer)` and `IsNotNull(employer.id)` are not actually pushed down to work at Parquet reader I would expect `IsNotNull(employer.id)` to be pushed down. In any case, I misunderstood what that `PushedFilters` metadata item means in the `FileScan` part of the physical plan. I thought that was a Parquet filter, but sometimes it is not. In any case, I'm not concerned about supporting filter push down at this point. My concern is around its side effects, but that has been allayed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22357: [SPARK-25363][SQL] Fix schema pruning in where cl...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/22357#discussion_r216545091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -110,7 +110,17 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { val projectionRootFields = projects.flatMap(getRootFields) val filterRootFields = filters.flatMap(getRootFields) -(projectionRootFields ++ filterRootFields).distinct +// Kind of expressions don't need to access any fields of a root fields, e.g., `IsNotNull`. +// For them, if there are any nested fields accessed in the query, we don't need to add root +// field access of above expressions. +// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, +// we don't need to read nested fields of `name` struct other than `first` field. --- End diff -- I'm having trouble accepting this, but perhaps I'm reading too much into it (or not enough). Let me illustrate with a couple of queries and their physical plans. Assuming the data model in `ParquetSchemaPruningSuite.scala`, the physical plan for the query select employer.id from contacts where employer is not null is ``` == Physical Plan == *(1) Project [employer#36.id AS id#46] +- *(1) Filter isnotnull(employer#36) +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` The physical plan for the query select employer.id from contacts where employer.id is not null is ``` == Physical Plan == *(1) Project [employer#36.id AS id#47] +- *(1) Filter (isnotnull(employer#36) && isnotnull(employer#36.id)) +- *(1) FileScan parquet [employer#36,p#37] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` The read schemata are the same, but the query filters are not. The file scan for the second query looks as I would expect, but the scan for the first query appears to only read `employer.id` even though it needs to check `employer is not null`. If it only reads `employer.id`, how does it check that `employer.company` is not null? Perhaps `employer.id` is null but `employer.company` is not null for some row... I have run some tests to validate that this PR is returning the correct results for both queries, and it is. But I don't understand why. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 I have reconstructed my original patch for this issue, but I've discovered it will require more work to complete. However, as part of that reconstruction I've discovered a couple of cases where our patches create different physical plans. The query results are the same, but I'm not sure whichâif eitherâplan is correct. I want to go into detail on that, but it's complicated and I have to call it quits tonight. I have a flight in the morning, and I'll be on break next week. In the meantime, I'll just copy and paste two queriesâbased on the data in `ParquetSchemaPruningSuite.scala`âwith two query plans each. First query: select employer.id from contacts where employer is not null This PR (as of d68f808) produces: ``` == Physical Plan == *(1) Project [employer#4442.id AS id#4452] +- *(1) Filter isnotnull(employer#4442) +- *(1) FileScan parquet [employer#4442,p#4443] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` My WIP patch produces: ``` == Physical Plan == *(1) Project [employer#4442.id AS id#4452] +- *(1) Filter isnotnull(employer#4442) +- *(1) FileScan parquet [employer#4442,p#4443] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct>> ``` Second query: select employer.id from contacts where employer.id = 0 This PR produces: ``` == Physical Plan == *(1) Project [employer#4297.id AS id#4308] +- *(1) Filter (isnotnull(employer#4297) && (employer#4297.id = 0)) +- *(1) FileScan parquet [employer#4297,p#4298] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [IsNotNull(employer)], ReadSchema: struct> ``` My WIP patch produces: ``` == Physical Plan == *(1) Project [employer#4445.id AS id#4456] +- *(1) Filter (isnotnull(employer#4445.id) && (employer#4445.id = 0)) +- *(1) FileScan parquet [employer#4445,p#4446] Batched: false, Format: Parquet, PartitionCount: 2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct> ``` I wanted to give my thoughts on the differences of these in detail, but I have to wrap up my work for the night. I'll be visiting family next week. I don't know how responsive I'll be in that time, but I'll at least try to check back. Cheers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/19410 Hi @szhem. I understand you've put a lot of work into this implementation, however I think you should try a simpler approach before we consider something more complicated. I believe an approach based on weak references and a reference queue would be a much simpler alternative. Can you give that a try? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r201863251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- @gatorsmile @rdblue @mswit-databricks What is your position on this? I don't know that the parquet spec provides a definitive answer on this question. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r201863353 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet --- End diff -- Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r201863463 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst + * complex type extractor. For example, consider a relation with the following schema: + * + * {{{ + * root + *|-- name: struct (nullable = true) + *||-- first: string (nullable = true) + *||-- last: string (nullable = true) + *}}} + * + * Further, suppose we take the select expression `name.first`. This will parse into an + * `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: + * + * {{{ + * GetStructFieldObject( + * AttributeReference("name", StructType(_), _, _), + * StructField("first", StringType, _, _)) + * }}} + * + * [[SelectedField]] converts that expression into + * + * {{{ + * StructField("name", StructType(Array(StructField("first", StringType + * }}} + * + * by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the + * same name as its child (or "parent" going right to left in the select expression) and a data + * type appropriate to the complex type extractor. In our example, the name of the child expression + * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string + * field named "first". + * + * @param expr the top-level complex type extractor + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { --- End diff -- The code does not compile with that change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/19410 Hi @szhem. Thanks for the information regarding disk use for your scenario. What do you think about my second point, using the `ContextCleaner`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 @viirya Please amend https://github.com/apache/spark/blob/d684a0f30599d50061ef78ec62edcdd3b726e2d9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala#L303-L306 to remove the explanatory comment, and uncomment the commented-out line of code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22394: [SPARK-25406][SQL] For ParquetSchemaPruningSuite.scala, ...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22394 FYI @viirya @dbtsai @gatorsmile @HyukjinKwon Can I get someone's review of this PR please? The unmasked failures appear to be false positives, so no changes to the tested code are requiredâjust changes to the tests themselves. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22357: [SPARK-25363][SQL] Fix schema pruning in where clause by...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/22357 And FYI this is the Jira issue I promised in https://github.com/apache/spark/pull/22357#issuecomment-419940228 yesterday: https://issues.apache.org/jira/browse/SPARK-25407. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org