[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-09 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/20201

[SPARK-22389][SQL] data source v2 partitioning reporting interface

## What changes were proposed in this pull request?

a new interface which allows data source to report partitioning and avoid 
shuffle at Spark side.

The design is pretty like the internal distribution/partitioing framework. 
Spark defines a `Distribution` interfaces and several concrete implementations, 
and ask the data source to report a `Partitioning`, the `Partitioning` should 
tell Spark if it can satisfy a `Distribution` or not.

## How was this patch tested?

new test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark partition-reporting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20201.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20201


commit be14e3bd7598eb3ed583e18c1d9927d5c7f563b4
Author: Wenchen Fan 
Date:   2018-01-09T02:08:53Z

data source v2 partitioning reporting interface




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r162732939
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala 
---
@@ -95,6 +96,34 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("partitioning reporting") {
+import org.apache.spark.sql.functions.{count, sum}
+Seq(classOf[PartitionAwareDataSource], 
classOf[JavaPartitionAwareDataSource]).foreach { cls =>
+  withClue(cls.getName) {
+val df = spark.read.format(cls.getName).load()
+checkAnswer(df, Seq(Row(1, 4), Row(1, 4), Row(3, 6), Row(2, 6), 
Row(4, 2), Row(4, 2)))
+
+val groupByColA = df.groupBy('a).agg(sum('b))
+checkAnswer(groupByColA, Seq(Row(1, 8), Row(2, 6), Row(3, 6), 
Row(4, 4)))
+assert(groupByColA.queryExecution.executedPlan.collectFirst {
+  case e: ShuffleExchangeExec => e
+}.isEmpty)
+
+val groupByColAB = df.groupBy('a, 'b).agg(count("*"))
--- End diff --

Try `df.groupBy('a + 'b).agg(count("*")).show()` 

At least, it should not fail, even if we do not support complex 
`ClusteredDistribution` expressions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r162733141
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A concrete implementation of {@link Distribution}. Represents a 
distribution where records that
+ * share the same values for the {@link #clusteredColumns} will be 
produced by the same
+ * {@link ReadTask}.
+ */
+@InterfaceStability.Evolving
+public class ClusteredDistribution implements Distribution {
+  public String[] clusteredColumns;
--- End diff --

Need to emphasize these columns are order insensitive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r162733351
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, 
Partitioning}
+
+/**
+ * An adapter from public data source partitioning to catalyst internal 
partitioning.
--- End diff --

`Partitioning `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r162733463
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java 
---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent data distribution requirement, which 
specifies how the records should
+ * be distributed among the {@link ReadTask}s that are returned by
+ * {@link DataSourceV2Reader#createReadTasks()}. Note that this interface 
has nothing to do with
+ * the data ordering inside one partition(the output records of a single 
{@link ReadTask}).
+ *
+ * The instance of this interface is created and provided by Spark, then 
consumed by
+ * {@link Partitioning#satisfy(Distribution)}. This means users don't need 
to implement
--- End diff --

`users ` -> `data source developers`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r162733629
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent output data partitioning for a data source, 
which is returned by
--- End diff --

`output` -> `the output`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r162733684
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent output data partitioning for a data source, 
which is returned by
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
+ * snapshot, once created, it should be deterministic and always report 
same number of partitions
--- End diff --

`, once` -> `. Once`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r163008488
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent the output data partitioning for a data 
source, which is returned by
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
+ * snapshot. Once created, it should be deterministic and always report 
same number of partitions
+ * and same "satisfy" result for a certain distribution.
--- End diff --

`same number` -> `the same number` and `same "satisfy" result` -> `the same 
"satisfy" result` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r163010318
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent the output data partitioning for a data 
source, which is returned by
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
+ * snapshot. Once created, it should be deterministic and always report 
same number of partitions
+ * and same "satisfy" result for a certain distribution.
+ */
+@InterfaceStability.Evolving
+public interface Partitioning {
+
+  /**
+   * Returns the number of partitions/{@link ReadTask}s the data source 
outputs.
+   */
+  int numPartitions();
+
+  /**
+   * Returns true if this partitioning can satisfy the given distribution, 
which means Spark does
+   * not need to shuffle the output data of this data source for some 
certain operations.
+   *
+   * Note that, Spark may add new concrete implementations of {@link 
Distribution} in new releases.
+   * This method should be aware of it and always return false for 
unrecognized distributions. It's
+   * recommended to check every Spark new release and support new 
distributions if possible, to
+   * avoid shuffle at Spark side for more cases.
+   */
+  boolean satisfy(Distribution d);
--- End diff --

`d` -> `distribution`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r163008839
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An interface to represent the output data partitioning for a data 
source, which is returned by
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work like a
+ * snapshot. Once created, it should be deterministic and always report 
same number of partitions
+ * and same "satisfy" result for a certain distribution.
+ */
+@InterfaceStability.Evolving
+public interface Partitioning {
+
+  /**
+   * Returns the number of partitions/{@link ReadTask}s the data source 
outputs.
--- End diff --

`Returns the number of partitions/(i.e., {@link ReadTask}s) that the data 
source outputs.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20201#discussion_r163010889
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.reader.*;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaPartitionAwareDataSource implements DataSourceV2, 
ReadSupport {
+
+  class Reader implements DataSourceV2Reader, SupportsReportPartitioning {
+private final StructType schema = new StructType().add("a", 
"int").add("b", "int");
+
+@Override
+public StructType readSchema() {
+  return schema;
+}
+
+@Override
+public List> createReadTasks() {
+  return java.util.Arrays.asList(
+new SpecificReadTask(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
+new SpecificReadTask(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
+}
+
+@Override
+public Partitioning outputPartitioning() {
+  return new MyPartitioning();
+}
+  }
+
+  static class MyPartitioning implements Partitioning {
+
+@Override
+public int numPartitions() {
+  return 2;
+}
+
+@Override
+public boolean satisfy(Distribution d) {
--- End diff --

ditto 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20201: [SPARK-22389][SQL] data source v2 partitioning re...

2018-01-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20201


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org