[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144453526 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,14 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" + +s" create job summaries. " + --- End diff -- D'oh, `s` ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19448 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144388430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- I think once per write operation is fine. It's not like it is once per file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144381367 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- yes. there is that. Options: do something complicated with a static field to only print ones. Log at debug so people only see the message if they are trying to track things down. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144379591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- If we issuing a warning log, we will see such a warning message for each write operation. Does it look annoying? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144375059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,11 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + || classOf[ParquetOutputCommitter].isAssignableFrom(committerClass), + s"Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries." ++ " Set Parquet option " + ParquetOutputFormat.ENABLE_JOB_SUMMARY + " to false.") --- End diff -- I'd thought about that; didn't look any better or worse. Will change it for log message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144344596 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- +1 for warn and continue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144344565 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,11 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + || classOf[ParquetOutputCommitter].isAssignableFrom(committerClass), + s"Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries." ++ " Set Parquet option " + ParquetOutputFormat.ENABLE_JOB_SUMMARY + " to false.") --- End diff -- nit: ```scala ... s"Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries. " + s"Set Parquet option '${ParquetOutputFormat.ENABLE_JOB_SUMMARY}' to false.") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144331909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- I think I'd prefer the warn & continue option. It does little good to fail so late in a job, when the caller has already indicated that they want to use a different committer. Let them write the data out since this isn't a correctness issue, and they can add a summary file later if they want. Basically, there's less annoyance and interruption by not writing a summary file than by failing a job and forcing the user to re-run near the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144289264 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters --- End diff -- nit: `commtters` -> `committers` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144239543 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- There's another option which is "log @ warn and continue". If someone has changed the committer, they get the consequences. That could also permit someone with a modified committer to generate schema summaries if they chose/permitted. IT'd simplify this patch, need the tests tweaked...I'd change the SQLConf text with the committer key to say "if the committer isn't a ParquetOutputCommitter then don't expect summaries" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144238941 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +if (spark != null) { + spark.stop() + spark = null +} +super.afterAll() --- End diff -- good point --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144195079 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- In Spark SQL, we do issue the `AnalysisException` in many similar cases. I am also fine to use `SparkException`. In this specific case, the users are able to control the conf to make it works. Thus, we also need to improve the message to let users know how to resolve it by changing the conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144119701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- `SparkException` makes it sound like it's a problem that Spark caused in some way. While this is caused by user input being incorrect, in which case the suggested `IllegalArgumentException` (which `require` throws) is better imo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144105439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- `SparkException` is better. Normally, we want to issue a Spark-specific exception type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144088592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- `AnalysisException`? Shouldn't this be `SparkException`? By the time this runs, Spark has already analyzed, optimized, and planned the job. Doesn't seem like failing analysis is appropriate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144086925 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +if (spark != null) { + spark.stop() + spark = null +} +super.afterAll() --- End diff -- ```Scala try { ... } finally { super.afterAll() } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144082508 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,10 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) --- End diff -- We need to issue an `AnalysisException` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144065810 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- aah. in the move to require() everything is going back onto a single line. so now moot --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144065074 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- aah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r144065041 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null --- End diff -- done, + will add a check for spark==null so if a failure happens during setup, the exception doesn't get lost in teardown --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143996060 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- Oh, I mean .. s in `s" .. "`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143992362 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null + } + + test("alternative output committer, merge schema") { +intercept[RuntimeException] { + val stat = writeDataFrame(MarkingFileOutput.COMMITTER, true, true) + logError(s"Created marker file $stat") +} + } + + test("alternative output committer, no merge schema") { +writeDataFrame(MarkingFileOutput.COMMITTER, false, true) --- End diff -- OK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143992319 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- Depends on the policy about "what to do if it's not a parquet committer *and* the option for job summaries is set. It could just mean "you don't get summaries", which worksforme :). May want to log at info though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143992018 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + --- End diff -- will do --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143894782 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null + } + + test("alternative output committer, merge schema") { +intercept[RuntimeException] { + val stat = writeDataFrame(MarkingFileOutput.COMMITTER, true, true) + logError(s"Created marker file $stat") +} + } + + test("alternative output committer, no merge schema") { +writeDataFrame(MarkingFileOutput.COMMITTER, false, true) --- End diff -- I think It might be a little bit better to use named arguments for readability: `writeDataFrame(MarkingFileOutput.COMMITTER, summary = false, check = true)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143894675 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null --- End diff -- maybe `super.afterAll()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143894921 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCommitterSuite.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.FileNotFoundException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} + +import org.apache.spark.{LocalSparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +/** + * Test logic related to choice of output commtters + */ +class ParquetCommitterSuite extends SparkFunSuite with SQLTestUtils + with LocalSparkContext { + + private val PARQUET_COMMITTER = classOf[ParquetOutputCommitter].getCanonicalName + + protected var spark: SparkSession = _ + + /** + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. + */ + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null + } + + test("alternative output committer, merge schema") { +intercept[RuntimeException] { + val stat = writeDataFrame(MarkingFileOutput.COMMITTER, true, true) + logError(s"Created marker file $stat") +} + } + + test("alternative output committer, no merge schema") { +writeDataFrame(MarkingFileOutput.COMMITTER, false, true) + } + + test("Parquet output committer, merge schema") { +writeDataFrame(PARQUET_COMMITTER, true, false) + } + + test("Parquet output committer, no merge schema") { +writeDataFrame(PARQUET_COMMITTER, false, false) + } + + /** + * Write a trivial dataframe as Parquet, using the given committer + * and job summary option. + * @param committer committer to use + * @param summary create a job summary + * @param check look for a marker file + * @return if a marker file was sought, it's file status. + */ + private def writeDataFrame( + committer: String, + summary: Boolean, + check: Boolean): Option[FileStatus] = { +var result: Option[FileStatus] = None +withSQLConf( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> committer, + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> summary.toString) { +withTempPath { dest => + val df = spark.createDataFrame(Seq((1, "4"), (2, "2"))) + val destPath = new Path(dest.toURI) + df.write.format("parquet").save(destPath.toString) + if (check) { +result = Some(MarkingFileOutput.checkMarker( + destPath, + spark.sparkContext.hadoopConfiguration)) + } +} +} +result + } +} + +/** + * A file output committer which explicitly touches a file "marker"; this + * is how tests can verify that this committer was used. + * @param outputPath output path + * @param context task context + */ +private class MarkingFileOutputCommitter( +outputPath: Path, +context: TaskAttemptContext) extends FileOutputCommitter(outputPath, context) { + + override def commitJob(context: JobContext): Unit = { +super.commitJob(context) +MarkingFileOutput.touch(outputPath, conte
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143879132 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + +s" and cannot create job summaries.") --- End diff -- Looks we can remove this `s` BTW. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143879070 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + --- End diff -- How about `require` maybe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19448#discussion_r143878105 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -138,6 +138,13 @@ class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } +if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) { + // output summary is requested, but the class is not a Parquet Committer + throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" + --- End diff -- `IllegalArgumentException` or some other better exception? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19448: [SPARK-22217] [SQL] ParquetFileFormat to support ...
GitHub user steveloughran opened a pull request: https://github.com/apache/spark/pull/19448 [SPARK-22217] [SQL] ParquetFileFormat to support arbitrary OutputCommitters ## What changes were proposed in this pull request? `ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and implicitly Hadoop `FileOutputCommitter` to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter` This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this. Because a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message. (It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.) ## How was this patch tested? The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary. | committer | summary | outcome | |---|-|-| | parquet | true| success | | parquet | false | success | | marking | false | success with marker | | marking | true| exception | All tests are happy. You can merge this pull request into a Git repository by running: $ git pull https://github.com/steveloughran/spark cloud/SPARK-22217-committer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19448.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19448 commit e6fdbdcf4118283abd22f7b14586ed742d225657 Author: Steve Loughran Date: 2017-07-12T10:42:51Z SPARK-22217 tuning ParquetOutputCommitter to support any committer class, provided saveSummaries is disabled. With Tests Change-Id: I19872dc1c095068ed5a61985d53cb7258bd9a9bb --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org