[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20201 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r163010889 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.sql.sources.v2.DataSourceV2; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.ReadSupport; +import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.types.StructType; + +public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport { + + class Reader implements DataSourceV2Reader, SupportsReportPartitioning { +private final StructType schema = new StructType().add("a", "int").add("b", "int"); + +@Override +public StructType readSchema() { + return schema; +} + +@Override +public List> createReadTasks() { + return java.util.Arrays.asList( +new SpecificReadTask(new int[]{1, 1, 3}, new int[]{4, 4, 6}), +new SpecificReadTask(new int[]{2, 4, 4}, new int[]{6, 2, 2})); +} + +@Override +public Partitioning outputPartitioning() { + return new MyPartitioning(); +} + } + + static class MyPartitioning implements Partitioning { + +@Override +public int numPartitions() { + return 2; +} + +@Override +public boolean satisfy(Distribution d) { --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r163008839 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface to represent the output data partitioning for a data source, which is returned by + * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a + * snapshot. Once created, it should be deterministic and always report same number of partitions + * and same "satisfy" result for a certain distribution. + */ +@InterfaceStability.Evolving +public interface Partitioning { + + /** + * Returns the number of partitions/{@link ReadTask}s the data source outputs. --- End diff -- `Returns the number of partitions/(i.e., {@link ReadTask}s) that the data source outputs.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r163010318 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface to represent the output data partitioning for a data source, which is returned by + * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a + * snapshot. Once created, it should be deterministic and always report same number of partitions + * and same "satisfy" result for a certain distribution. + */ +@InterfaceStability.Evolving +public interface Partitioning { + + /** + * Returns the number of partitions/{@link ReadTask}s the data source outputs. + */ + int numPartitions(); + + /** + * Returns true if this partitioning can satisfy the given distribution, which means Spark does + * not need to shuffle the output data of this data source for some certain operations. + * + * Note that, Spark may add new concrete implementations of {@link Distribution} in new releases. + * This method should be aware of it and always return false for unrecognized distributions. It's + * recommended to check every Spark new release and support new distributions if possible, to + * avoid shuffle at Spark side for more cases. + */ + boolean satisfy(Distribution d); --- End diff -- `d` -> `distribution` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r163008488 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface to represent the output data partitioning for a data source, which is returned by + * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a + * snapshot. Once created, it should be deterministic and always report same number of partitions + * and same "satisfy" result for a certain distribution. --- End diff -- `same number` -> `the same number` and `same "satisfy" result` -> `the same "satisfy" result` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r162733684 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface to represent output data partitioning for a data source, which is returned by + * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a + * snapshot, once created, it should be deterministic and always report same number of partitions --- End diff -- `, once` -> `. Once` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r162733629 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface to represent output data partitioning for a data source, which is returned by --- End diff -- `output` -> `the output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r162733463 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * An interface to represent data distribution requirement, which specifies how the records should + * be distributed among the {@link ReadTask}s that are returned by + * {@link DataSourceV2Reader#createReadTasks()}. Note that this interface has nothing to do with + * the data ordering inside one partition(the output records of a single {@link ReadTask}). + * + * The instance of this interface is created and provided by Spark, then consumed by + * {@link Partitioning#satisfy(Distribution)}. This means users don't need to implement --- End diff -- `users ` -> `data source developers` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r162733351 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.physical +import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning} + +/** + * An adapter from public data source partitioning to catalyst internal partitioning. --- End diff -- `Partitioning ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r162733141 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; + +/** + * A concrete implementation of {@link Distribution}. Represents a distribution where records that + * share the same values for the {@link #clusteredColumns} will be produced by the same + * {@link ReadTask}. + */ +@InterfaceStability.Evolving +public class ClusteredDistribution implements Distribution { + public String[] clusteredColumns; --- End diff -- Need to emphasize these columns are order insensitive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20201#discussion_r162732939 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala --- @@ -95,6 +96,34 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } + test("partitioning reporting") { +import org.apache.spark.sql.functions.{count, sum} +Seq(classOf[PartitionAwareDataSource], classOf[JavaPartitionAwareDataSource]).foreach { cls => + withClue(cls.getName) { +val df = spark.read.format(cls.getName).load() +checkAnswer(df, Seq(Row(1, 4), Row(1, 4), Row(3, 6), Row(2, 6), Row(4, 2), Row(4, 2))) + +val groupByColA = df.groupBy('a).agg(sum('b)) +checkAnswer(groupByColA, Seq(Row(1, 8), Row(2, 6), Row(3, 6), Row(4, 4))) +assert(groupByColA.queryExecution.executedPlan.collectFirst { + case e: ShuffleExchangeExec => e +}.isEmpty) + +val groupByColAB = df.groupBy('a, 'b).agg(count("*")) --- End diff -- Try `df.groupBy('a + 'b).agg(count("*")).show()` At least, it should not fail, even if we do not support complex `ClusteredDistribution` expressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/20201 [SPARK-22389][SQL] data source v2 partitioning reporting interface ## What changes were proposed in this pull request? a new interface which allows data source to report partitioning and avoid shuffle at Spark side. The design is pretty like the internal distribution/partitioing framework. Spark defines a `Distribution` interfaces and several concrete implementations, and ask the data source to report a `Partitioning`, the `Partitioning` should tell Spark if it can satisfy a `Distribution` or not. ## How was this patch tested? new test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark partition-reporting Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20201.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20201 commit be14e3bd7598eb3ed583e18c1d9927d5c7f563b4 Author: Wenchen Fan Date: 2018-01-09T02:08:53Z data source v2 partitioning reporting interface --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org