[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90323/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19602 **[Test build #90323 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90323/testReport)** for PR 19602 at commit [`57d0c3b`](https://github.com/apache/spark/commit/57d0c3b3e9ece180429172bc925b337d88864b13). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21106 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21106 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90319/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol binding to...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21066 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21106 **[Test build #90319 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90319/testReport)** for PR 21106 at commit [`1f5cc17`](https://github.com/apache/spark/commit/1f5cc17741ef0feed5894fbafd21c36a44bc5373). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol binding to...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21066 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3010/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol binding to...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21066 **[Test build #90333 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90333/testReport)** for PR 21066 at commit [`3e1bce3`](https://github.com/apache/spark/commit/3e1bce3b9163de836681c69a2eff8e67108ac7b7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21260: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21260 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21095: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user andrusha commented on the issue: https://github.com/apache/spark/pull/21095 @madanadit @liyinan926 @foxish I addressed comments here and added PVC support https://github.com/apache/spark/pull/21260. However I'm unsure if this is the right way to go, please check the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21260: [SPARK-23529][K8s] Support mounting hostPath volumes
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21260 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21186: [SPARK-22279][SQL] Enable `convertMetastoreOrc` by defau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21186 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21186: [SPARK-22279][SQL] Enable `convertMetastoreOrc` by defau...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21186 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3009/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21260: [SPARK-23529][K8s] Support mounting hostPath volu...
GitHub user andrusha opened a pull request: https://github.com/apache/spark/pull/21260 [SPARK-23529][K8s] Support mounting hostPath volumes This PR continues #21095 and intersects with #21238. I've added volume mounts as a separate step and added PersistantVolumeClaim support. There is a fundamental problem with how we pass the options through spark conf to fabric8. For each volume type and all possible volume options we would have to implement some custom code to map config values to fabric8 calls. This will result in big body of code we would have to support and means that Spark will always be somehow out of sync with k8s. I think there needs to be a discussion how to proceed correctly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrusha/spark k8s-vol Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21260.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21260 commit 51a877a7d86b6c0f18d381ba9300de18ccef9189 Author: madanaditDate: 2018-04-05T01:02:25Z Support mounting hostPath volumes for executors commit 4ada724ab2f6e79553fa05475b216884f9ba127a Author: madanadit Date: 2018-04-05T20:57:45Z Read mode for mounted volumes commit 17258a3967b6d8b2170c3c7d7186449c3155fef1 Author: madanadit Date: 2018-04-05T22:05:07Z Refactor commit 6fff716cbe42e184059660cf2009f594048a6420 Author: madanadit Date: 2018-04-06T18:18:09Z Fix style commit f961b33aa34ad7908ac6d77ded251e18bf6e835a Author: madanadit Date: 2018-04-06T19:57:45Z Add unit tests commit af4d9baae8d637215d2be645b377fa5dbd714e94 Author: madanadit Date: 2018-04-06T20:00:18Z Update comment commit 13e10493fa6f3b40c3eb406d0dc2f88d09ce20b8 Author: madanadit Date: 2018-04-06T23:56:31Z Fix unit tests commit 7a25d76e386c0d8157eba0ccc0517bea3b7f7f0e Author: madanadit Date: 2018-04-16T22:41:26Z Fix typo commit a5a277a065dec156e976478de7767062bcf1da13 Author: madanadit Date: 2018-04-17T21:51:58Z Change configuration format commit 772128e13895d6f313efa3cbc38947db51c3e493 Author: madanadit Date: 2018-04-17T23:34:37Z Fix build commit a393b92c93d60bdcf96025ae24d6c1fbecf17f9b Author: madanadit Date: 2018-04-17T23:44:08Z Fix test commit b9b3dcb22ded628f0abe1caa0beb2b15da6ccc49 Author: madanadit Date: 2018-04-18T00:39:14Z Fetch properties correctly commit 81a7811b06a0195e84ffeddc135875da6c500a7e Author: madanadit Date: 2018-04-18T05:21:11Z Fix test cases commit 95d1b0d8c681cd46f47c8ab1692172d0b3b0aba8 Author: madanadit Date: 2018-04-18T06:42:56Z Abstract tests commit facde97b365a7acb02c41e5ef076a9ea0f1edff9 Author: madanadit Date: 2018-04-18T06:59:43Z Add readOnly option commit ccdc7990ca8995ff86f46647f8a2949848f06380 Author: madanadit Date: 2018-04-18T07:06:17Z Fix test commit 7c1be8aff51a462bf96012fafbbfec765424de53 Author: madanadit Date: 2018-04-18T07:24:55Z Driver hostPath volumes with tests commit f482dfc370bace0c5315a2514604be2965bcbbaf Author: madanadit Date: 2018-04-19T23:56:58Z Refactor commit 136f5f65481f05cf10bd0c6968bf84c4363df53d Author: Andrew Korzhuev Date: 2018-05-07T17:03:05Z Address comments, rewrite volume mount to step commit f1ada4c402a7f5ca5fd5428a28a457cd2400c6f2 Author: Andrew Korzhuev Date: 2018-05-07T17:08:12Z Add persistant volume claim --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21186: [SPARK-22279][SQL] Enable `convertMetastoreOrc` by defau...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21186 **[Test build #90332 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90332/testReport)** for PR 21186 at commit [`ddd6872`](https://github.com/apache/spark/commit/ddd68726e7d8703a4c91e8624e0c5cdf83762cf1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21186: [SPARK-22279][SQL] Enable `convertMetastoreOrc` by defau...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/21186 To reduce the review scope, `convertMetastoreTableProperty` goes to #21259 . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21259: [SPARK-24112][SQL] Add `convertMetastoreTableProperty` c...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21259 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21259: [SPARK-24112][SQL] Add `convertMetastoreTableProperty` c...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21259 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3008/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21259: [SPARK-24112][SQL] Add `convertMetastoreTableProperty` c...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21259 **[Test build #90331 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90331/testReport)** for PR 21259 at commit [`eaecabc`](https://github.com/apache/spark/commit/eaecabc5a59457a4baf84dbb755dd7b876fdb536). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186484550 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) --- End diff -- I should add that `Path` is serializable in Hadoop 3 and [HADOOP-13519](https://issues.apache.org/jira/browse/HADOOP-13519); I've added a test to round trip serialization and so validate this and prevent regressions, and a comment to show its intentional (and that it's not something to copy and paste into Hadoop-2.x compatible code). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21259: [SPARK-24112][SQL] Add `convertMetastoreTableProp...
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/21259 [SPARK-24112][SQL] Add `convertMetastoreTableProperty` conf ## What changes were proposed in this pull request? In Apache Spark 2.4, [SPARK-23355](https://issues.apache.org/jira/browse/SPARK-23355) fixes a bug which ignores table properties during `convertMetastore` for tables created by `STORED AS ORC/PARQUET`. For some Parquet tables having table properties like `TBLPROPERTIES (parquet.compression 'NONE')`, it was ignored by default before Apache Spark 2.4. After upgrading cluster, Spark will write uncompressed file which is different from Apache Spark 2.3 and old. In order to provide full backward-compatibility, this introduces additional configuration `spark.sql.hive.convertMetastoreTableProperty` to restore the previous behavior by ignoring table properties. ## How was this patch tested? Pass the Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-convertMetastoreTableProperty Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21259.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21259 commit eaecabc5a59457a4baf84dbb755dd7b876fdb536 Author: Dongjoon HyunDate: 2018-04-27T18:10:55Z [SPARK-24112][SQL] Add `convertMetastoreTableProperty` conf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16677 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90311/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/16677 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 > We expect data source to produce `ColumnarBatch` for better performance, and the row interface performance is not that important. I disagree. The vectorized path isn't used for all Parquet table scans and we should continue to care about its performance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16677 **[Test build #90311 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90311/testReport)** for PR 16677 at commit [`a691e88`](https://github.com/apache/spark/commit/a691e885b1f304ba4037964e2fba09c540503e1a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21190 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90317/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21190 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21190 **[Test build #90317 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90317/testReport)** for PR 21190 at commit [`04ae0fa`](https://github.com/apache/spark/commit/04ae0faba4b6613574b160eac20cd0ebad519aff). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186482587 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/package.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.SQLConf + +/** + * Package object to assist in switching to the Hadoop Hadoop 3 + * [[org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory]] factory + * mechanism for dynamically loading committers for the destination stores. + * + * = Using Alternative Committers with Spark and Hadoop 3 = --- End diff -- yep. It is needed in the docs, with `docs/cloud-integration.md` the obvious place. But really that "build on Hadoop 3.1" is a prereq, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186482016 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark issue #21155: [SPARK-23927][SQL] Add "sequence" expression
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21155 **[Test build #90330 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90330/testReport)** for PR 21155 at commit [`383f180`](https://github.com/apache/spark/commit/383f1800470dbd7d54516e8f5609ee17dabb37ae). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21155: [SPARK-23927][SQL] Add "sequence" expression
Github user wajda commented on a diff in the pull request: https://github.com/apache/spark/pull/21155#discussion_r186479450 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1059,3 +1063,316 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +@ExpressionDescription( + usage = """ +_FUNC_(start, stop, step) - + Generates an array of elements from start to stop (inclusive), incrementing by step. + The type of the returned elements is the same as the type of argument expressions. + + Supported types are: byte, short, integer, long, date, timestamp. + + The start and stop expressions must resolve to the same type. + If start and stop expressions resolve to the 'date' or 'timestamp' type + then the step expression must resolve to the 'interval' type, otherwise to the same type + as the start and stop expressions. + """, + arguments = """ +Arguments: + * start - an expression. The start of the range. + * stop - an expression. The end the range (inclusive). + * step - an optional expression. The step of the range. + + By default step is 1 if start is less than or equal to stop, otherwise -1. + For the temporal sequences it's 1 day and -1 day respectively. + + If start is greater than stop then the step must be negative, and vice versa. + """, + examples = """ +Examples: + > SELECT _FUNC_(1, 5); + [1, 2, 3, 4, 5] + > SELECT _FUNC_(5, 1); + [5, 4, 3, 2, 1] + > SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month); + [2018-01-01, 2018-02-01, 2018-03-01] + """, + since = "2.4.0" +) +case class Sequence(left: Expression, +middle: Expression, +right: Expression, +timeZoneId: Option[String] = None) + extends TernaryExpression with TimeZoneAwareExpression { --- End diff -- Changed to ```Option[Expression]```, removed ```UnresolvedLiteral```. Thanks for suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21186: [SPARK-22279][SPARK-24112] Enable `convertMetastoreOrc` ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/21186 Hmm. I'll split this into two PRs in order to make it easy to review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21239: [SPARK-24040][SS] Support single partition aggregates in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21239 **[Test build #90328 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90328/testReport)** for PR 21239 at commit [`530e025`](https://github.com/apache/spark/commit/530e025d245321e7dd5c15d7e61acd37a76e6a8a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21155: [SPARK-23927][SQL] Add "sequence" expression
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21155 **[Test build #90329 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90329/testReport)** for PR 21155 at commit [`8786192`](https://github.com/apache/spark/commit/878619276592d189d69d9c6ed259f3d7d1f6fa48). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21239: [SPARK-24040][SS] Support single partition aggreg...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21239#discussion_r186467958 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.continuous + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.OutputMode + +class ContinuousAggregationSuite extends ContinuousSuiteBase { + import testImplicits._ + + test("not enabled") { +val ex = intercept[AnalysisException] { + val input = ContinuousMemoryStream.singlePartition[Int] + + testStream(input.toDF().agg(max('value)), OutputMode.Complete)( +AddData(input, 0, 1, 2), +CheckAnswer(2), +StopStream, +AddData(input, 3, 4, 5), +StartStream(), +CheckAnswer(5), +AddData(input, -1, -2, -3), +CheckAnswer(5)) +} + +assert(ex.getMessage.contains("Continuous processing does not support Aggregate operations")) + } + + test("basic") { +withSQLConf(("spark.sql.streaming.continuous.allowAllOperators", "true")) { --- End diff -- Supported the unsupported operation checker disabling flag as suggested by @arunmahadevan . As mentioned in the PR description, I don't think it's worth the effort to write these more specific checks - the current state is not something we could meaningfully present as a complete story. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21239: [SPARK-24040][SS] Support single partition aggreg...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21239#discussion_r186462620 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.atomic.AtomicLong + +object EpochTracker { + // The current epoch. Note that this is a shared reference; ContinuousWriteRDD.compute() will --- End diff -- I don't think it's the case that other classes can use it in the future. The correctness of this object depends on the fact that it's read only from everywhere but a single location. If we want to future-proof against having multiple things mutate it, we should remove the utility methods and only expose the AtomicLong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21239: [SPARK-24040][SS] Support single partition aggreg...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21239#discussion_r186476292 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.continuous + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.OutputMode + +class ContinuousAggregationSuite extends ContinuousSuiteBase { + import testImplicits._ + + test("not enabled") { +val ex = intercept[AnalysisException] { + val input = ContinuousMemoryStream.singlePartition[Int] + + testStream(input.toDF().agg(max('value)), OutputMode.Complete)( +AddData(input, 0, 1, 2), +CheckAnswer(2), +StopStream, +AddData(input, 3, 4, 5), +StartStream(), +CheckAnswer(5), +AddData(input, -1, -2, -3), +CheckAnswer(5)) +} + +assert(ex.getMessage.contains("Continuous processing does not support Aggregate operations")) + } + + test("basic") { +withSQLConf(("spark.sql.streaming.continuous.allowAllOperators", "true")) { + val input = ContinuousMemoryStream.singlePartition[Int] + + testStream(input.toDF().agg(max('value)), OutputMode.Complete)( --- End diff -- Kinda. I don't think it does anything wrong in update mode, per se; it's just hard to test because there are a variable number of epochs before the one containing the new data. I'd rather come up with a testing strategy for that in a separate PR than squeeze one in here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186475370 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark pull request #20787: [MINOR][DOCS] Documenting months_between directio...
Github user aditkumar commented on a diff in the pull request: https://github.com/apache/spark/pull/20787#discussion_r186475229 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -1194,13 +1194,21 @@ case class AddMonths(startDate: Expression, numMonths: Expression) } /** - * Returns number of months between dates date1 and date2. + * Returns number of months between dates `date1` and `date2`. + * If `date1` is later than `date2`, then the result is positive. + * If `date1` and `date2` are on the same day of month, or both + * are the last day of month, time of day will be ignored. Otherwise, the + * difference is calculated based on 31 days per month, and rounded to + * 8 digits unless roundOff=false. */ // scalastyle:off line.size.limit @ExpressionDescription( usage = """ -_FUNC_(timestamp1, timestamp2[, roundOff]) - Returns number of months between `timestamp1` and `timestamp2`. - The result is rounded to 8 decimal places by default. Set roundOff=false otherwise., +_FUNC_(date1, date2[, roundOff]) - If `date1` is later than `date2`, then the result --- End diff -- ðð¾ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186474730 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186474501 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r186474451 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,69 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArray(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, cn), ArrayType(_, _)) => +if (!cn) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure("All of the given keys should be non-null") +} + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = left.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = false + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] --- End diff -- Although it's not specified, duplicated key can lead to non-determinism of returned values in future. Currently, `GetMapValueUtil.getValueEval` returns a value for the first key in the map, but there is TODO to change O(n) algorithm. So I'm wondering how it would behave if some hashing was introduced. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186474366 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21106 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90313/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21106 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol binding to...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21066 cc @rxin @JoshRosen @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21236: [SPARK-23935][SQL] Adding map_entries function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21236 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21252: [SPARK-24193] Sort by disk when number of limit is big i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21252 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90315/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21106 **[Test build #90313 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90313/testReport)** for PR 21106 at commit [`06cc8cc`](https://github.com/apache/spark/commit/06cc8cc360024a2afe01478242144dee348b7a72). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21252: [SPARK-24193] Sort by disk when number of limit is big i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21252 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21236: [SPARK-23935][SQL] Adding map_entries function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21236 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90318/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21236: [SPARK-23935][SQL] Adding map_entries function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21236 **[Test build #90318 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90318/testReport)** for PR 21236 at commit [`d05ad9b`](https://github.com/apache/spark/commit/d05ad9be40064c61b05d838b3ba96b02267d5ee1). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21252: [SPARK-24193] Sort by disk when number of limit is big i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21252 **[Test build #90315 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90315/testReport)** for PR 21252 at commit [`7528475`](https://github.com/apache/spark/commit/75284753961c87976f5740be1fd4646966bd797f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21230: [SPARK-24172][SQL] we should not apply operator pushdown...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21230 So it was the use of `transformUp` that caused this rules to match multiple times, right? In that case, would it make more sense to do what @marmbrus suggested in the immutable plan PR and make this a strategy instead of an optimizer rule? That approach fits with what I suggested on #21118. We could have the scan node handle the filter and the projection so that it doesn't matter whether the source produces `UnsafeRow` or `InternalRow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21250: [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21250 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90325/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21250: [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21250 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21250: [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21250 **[Test build #90325 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90325/testReport)** for PR 21250 at commit [`a7c8037`](https://github.com/apache/spark/commit/a7c80377a69a91d51b8b444c2af52778ae0e2732). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21070 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3007/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21070 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186469794 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.internal.Logging + + +/** + * This dynamically binds to the factory-configured + * output committer, and is intended to allow callers to use any [[PathOutputCommitter]], + * even if not a subclass of [[ParquetOutputCommitter]]. + * + * The Parquet "parquet.enable.summary-metadata" option will only be supported + * if the instantiated committer itself supports it. + */ + --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21070 **[Test build #90327 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90327/testReport)** for PR 21070 at commit [`6c9d47b`](https://github.com/apache/spark/commit/6c9d47babd16b067923014d49b83bfd1afb33c9b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186469740 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.internal.Logging + + +/** + * This dynamically binds to the factory-configured --- End diff -- "Parquet Committer subclass " --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186469029 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -345,7 +345,7 @@ object SQLConf { "snappy, gzip, lzo.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) -.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo")) +.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd")) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186468896 --- Diff: dev/deps/spark-deps-hadoop-2.7 --- @@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar oro-2.0.8.jar osgi-resource-locator-1.0.1.jar paranamer-2.8.jar -parquet-column-1.8.2.jar -parquet-common-1.8.2.jar -parquet-encoding-1.8.2.jar -parquet-format-2.3.1.jar -parquet-hadoop-1.8.2.jar +parquet-column-1.10.0.jar +parquet-common-1.10.0.jar +parquet-encoding-1.10.0.jar +parquet-format-2.4.0.jar +parquet-hadoop-1.10.0.jar parquet-hadoop-bundle-1.6.0.jar -parquet-jackson-1.8.2.jar +parquet-jackson-1.10.0.jar --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21165: [Spark-20087][CORE] Attach accumulators / metrics...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/21165#discussion_r186468071 --- Diff: core/src/main/scala/org/apache/spark/TaskEndReason.scala --- @@ -212,9 +212,15 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { +case class TaskKilled( +reason: String, +accumUpdates: Seq[AccumulableInfo] = Seq.empty, +private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) --- End diff -- Hi @cloud-fan, I have looked at how to remove `Seq[AccumulableInfo]` tonight. It turns out that we cannot because `JsonProtocol` calls `taskEndReasonFromJson` to reconstruct `TaskEndReason`s. Since `AccumulatorV2` is an abstract class, we cannot simply construct `AccumulatorV2`s from json. Even we are promoting `AccumulatorV2`, we still need `AccumulableInfo` when (de)serializing json. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20787: [MINOR][DOCS] Documenting months_between directio...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20787#discussion_r186467842 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -1194,13 +1194,21 @@ case class AddMonths(startDate: Expression, numMonths: Expression) } /** - * Returns number of months between dates date1 and date2. + * Returns number of months between dates `date1` and `date2`. + * If `date1` is later than `date2`, then the result is positive. + * If `date1` and `date2` are on the same day of month, or both + * are the last day of month, time of day will be ignored. Otherwise, the + * difference is calculated based on 31 days per month, and rounded to + * 8 digits unless roundOff=false. */ // scalastyle:off line.size.limit @ExpressionDescription( usage = """ -_FUNC_(timestamp1, timestamp2[, roundOff]) - Returns number of months between `timestamp1` and `timestamp2`. - The result is rounded to 8 decimal places by default. Set roundOff=false otherwise., +_FUNC_(date1, date2[, roundOff]) - If `date1` is later than `date2`, then the result --- End diff -- Yea, I got that they are consistent now but actually I think timestamp is more correct since the actual expected types here are timestmaps, not dates. Let's just leave them as were alone. It had a long review iterations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21144: [SPARK-24043][SQL] Interpreted Predicate should i...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21144 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186467415 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90316/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186466925 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21258 **[Test build #90316 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90316/testReport)** for PR 21258 at commit [`cfd575b`](https://github.com/apache/spark/commit/cfd575b7a8f334f82d952892c8e3ef7d9543b3bd). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class CreateMapFromArray(left: Expression, right: Expression)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186467007 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + +s" destination=$destPath;" + +s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { +// until there's explicit extensions to the PathOutputCommitProtocols +// to support the spark mechanism, it's left to the individual committer +// choice to handle partitioning. +throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( +context: TaskAttemptContext): PathOutputCommitter = { + +logInfo(s"Setting up committer for path $destination") +committer = PathOutputCommitterFactory.createCommitter(destPath, context) + +// Special feature to force out the FileOutputCommitter, so as to guarantee +// that the binding is working properly. +val rejectFileOutput
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90312/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186466513 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( +jobId, +destination, +false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186466367 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21258 **[Test build #90312 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90312/testReport)** for PR 21258 at commit [`1ca1c41`](https://github.com/apache/spark/commit/1ca1c4138ed83f5137768f51f131920ac5cc559e). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186466259 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter} +import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext} +import org.apache.parquet.hadoop.ParquetOutputCommitter + +import org.apache.spark.internal.Logging + + +/** + * This dynamically binds to the factory-configured + * output committer, and is intended to allow callers to use any [[PathOutputCommitter]], + * even if not a subclass of [[ParquetOutputCommitter]]. + * + * The Parquet "parquet.enable.summary-metadata" option will only be supported + * if the instantiated committer itself supports it. + */ + +class BindingParquetOutputCommitter( +path: Path, +context: TaskAttemptContext) + extends ParquetOutputCommitter(path, context) with Logging { + + logInfo(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path") + + val committer = new BindingPathOutputCommitter(path, context) + + /** + * This is the committer ultimately bound to. + * @return the committer instantiated by the factory. + */ + def boundCommitter(): PathOutputCommitter = { +committer.getCommitter() + } + + override def getWorkPath: Path = { +committer.getWorkPath() + } + + override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = { +committer.setupTask(taskAttemptContext) + } + + override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = { +committer.commitTask(taskAttemptContext) + } + + override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = { +committer.abortTask(taskAttemptContext) + } + + override def setupJob(jobContext: JobContext): Unit = { +committer.setupJob(jobContext) + } + + override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = { +committer.needsTaskCommit(taskAttemptContext) + } + + override def cleanupJob(jobContext: JobContext): Unit = { +committer.cleanupJob(jobContext) + } + + override def isCommitJobRepeatable(jobContext: JobContext): Boolean = { +committer.isCommitJobRepeatable(jobContext) + } + + override def commitJob(jobContext: JobContext): Unit = { +committer.commitJob(jobContext) + } + + override def recoverTask(taskAttemptContext: TaskAttemptContext): Unit = { +committer.recoverTask(taskAttemptContext) + } + + /** + * Abort the job; log and ignore any IO exception thrown. + * + * @param jobContext job context + * @param state final state of the job + */ + override def abortJob( + jobContext: JobContext, + state: JobStatus.State): Unit = { +try { + committer.abortJob(jobContext, state) +} catch { + case e: IOException => --- End diff -- That's exactly the question @mridulm asked, which is why the next commit to this PR will mention in comments. Essentially: this abort operation is regularly used in exception handling code, and this code tends to assume that the abort() routine doesn't fail. If it does, then it can get rethrown and so hide the underlying failure which triggered the abort. There's an underlying question "what do you do when the abort operation itself fails", which lurks. For HDFS , with the FS used as the dest, then `rm -rf $dest` does that cleanup For S3, uncommitted uploads still incur charges, so
[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/21144 LGTM - merging to master. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186464674 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); +} catch (IOException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes", e); +} + } + @Override public final void readIntegers(int total, WritableColumnVector c, int rowId) { -c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET); -offset += 4 * total; +int requiredBytes = total * 4; +ByteBuffer buffer = getBuffer(requiredBytes); + +if (buffer.hasArray()) { --- End diff -- No, there is no guarantee that the buffer from Parquet is on the heap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21070#discussion_r186464557 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java --- @@ -63,115 +59,157 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + private ByteBuffer getBuffer(int length) { +try { + return in.slice(length).order(ByteOrder.LITTLE_ENDIAN); --- End diff -- No, `slice` doesn't copy. That's why we're using `ByteBuffer` now, to avoid copy operations. Setting the byte order to `LITTLE_ENDIAN` is correct because it is for the buffer and Parquet buffers store values in little endian: https://github.com/apache/parquet-format/blob/master/Encodings.md. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20787: [MINOR][DOCS] Documenting months_between directio...
Github user aditkumar commented on a diff in the pull request: https://github.com/apache/spark/pull/20787#discussion_r186464442 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -1194,13 +1194,21 @@ case class AddMonths(startDate: Expression, numMonths: Expression) } /** - * Returns number of months between dates date1 and date2. + * Returns number of months between dates `date1` and `date2`. + * If `date1` is later than `date2`, then the result is positive. + * If `date1` and `date2` are on the same day of month, or both + * are the last day of month, time of day will be ignored. Otherwise, the + * difference is calculated based on 31 days per month, and rounded to + * 8 digits unless roundOff=false. */ // scalastyle:off line.size.limit @ExpressionDescription( usage = """ -_FUNC_(timestamp1, timestamp2[, roundOff]) - Returns number of months between `timestamp1` and `timestamp2`. - The result is rounded to 8 decimal places by default. Set roundOff=false otherwise., +_FUNC_(date1, date2[, roundOff]) - If `date1` is later than `date2`, then the result --- End diff -- I'm going to leave it as date1, date2 because that also lines up with the variable names in the function definition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r186464404 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,69 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArray(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, cn), ArrayType(_, _)) => +if (!cn) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure("All of the given keys should be non-null") +} + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = left.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = false + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] --- End diff -- Could you please let us know where this specification is described or is derived from? It is not written [here](https://prestodb.io/docs/current/functions/map.html). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186463919 --- Diff: hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathCommitterConstants.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +/** + * Constants related to Hadoop committer setup and configuration. + * Most of these are scattered around the hadoop-mapreduce classes. + */ +object PathCommitterConstants { --- End diff -- Now used in unit tests of binding, specifically `org.apache.spark.internal.io.cloud.CommitterBindingSuite`'s "cloud binding to SparkConf"though that's not actually doing that much. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21249: [SPARK-23291][R][FOLLOWUP] Update SparkR migration note ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21249 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90326/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21249: [SPARK-23291][R][FOLLOWUP] Update SparkR migration note ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21249 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21249: [SPARK-23291][R][FOLLOWUP] Update SparkR migration note ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21249 **[Test build #90326 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90326/testReport)** for PR 21249 at commit [`04e042a`](https://github.com/apache/spark/commit/04e042a2dc772abae13618b22539f10b5356e6cd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21066: [SPARK-23977][CLOUD][WIP] Add commit protocol bin...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r186463018 --- Diff: hadoop-cloud/src/main/scala/org/apache/spark/internal/io/cloud/PathCommitterConstants.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +/** + * Constants related to Hadoop committer setup and configuration. + * Most of these are scattered around the hadoop-mapreduce classes. + */ +object PathCommitterConstants { + + /** + * Scheme prefix for per-filesystem scheme committers. + */ + val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme" --- End diff -- I'm using those options for testing now. Even so, I'm not sure the setting needed to say "experimental/unstable". What do you suggest? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21258 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90314/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_fromarray function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21258 **[Test build #90314 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90314/testReport)** for PR 21258 at commit [`b36abf7`](https://github.com/apache/spark/commit/b36abf742ff00448304055b2c41d25724719d8e9). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21118 > Actually the `SupportsScanUnsafeRow` is only there to avoid perf regression for migrating file sources. If you think that's not a good public API, we can move it to internal package and only use it for file sources. I don't think it is a good idea to introduce additions for file sources. Part of the motivation for the v2 API is to get rid of those. Besides, I don't think we need it if we handle conversion in Spark instead of in the data sources. I think we should update the physical plan and push both filters and projections into the v2 scan node. Then data sources won't need to produce `UnsafeRow` but we can guarantee that the scan node produces `UnsafeRow`, which it would already do in most cases because it includes a projection. I'll open a PR for this, or I can include the change here if you prefer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21249: [SPARK-23291][R][FOLLOWUP] Update SparkR migration note ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21249 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3006/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21249: [SPARK-23291][R][FOLLOWUP] Update SparkR migration note ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21249 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21250: [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21250 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21250: [SPARK-23291][SQL][R][BRANCH-2.3] R's substr should not ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21250 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3005/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20787: [MINOR][DOCS] Documenting months_between direction
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20787 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21028: [SPARK-23922][SQL] Add arrays_overlap function
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21028#discussion_r186458781 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -28,6 +30,34 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.{ByteArray, UTF8String} +/** + * Base trait for [[BinaryExpression]]s with two arrays of the same element type and implicit + * casting. + */ +trait BinaryArrayExpressionWithImplicitCast extends BinaryExpression + with ImplicitCastInputTypes { + + protected lazy val elementType: DataType = inputTypes.head.asInstanceOf[ArrayType].elementType + + override def inputTypes: Seq[AbstractDataType] = { +TypeCoercion.findWiderTypeForTwo(left.dataType, right.dataType) match { --- End diff -- what about `findWiderTypeWithoutStringPromotionForTwo`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org