[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208342404
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+/**
+ * A factory of {@link PartitionReader}s. Implementations can do either 
row-based scan or columnar
+ * scan, by switching the {@link #supportColumnarReads()} flag.
+ */
+@InterfaceStability.Evolving
+public interface PartitionReaderFactory extends Serializable {
+
+  /**
+   * Returns a row-based partition reader to read data from the given 
{@link InputPartition}.
+   *
+   * Implementations probably need to cast the input partition to the 
concrete
+   * {@link InputPartition} class defined for the data source.
+   *
+   * If this method fails (by throwing an exception), the corresponding 
Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  PartitionReader createReader(InputPartition partition);
+
+  /**
+   * Returns a columnar partition reader to read data from the given 
{@link InputPartition}.
+   *
+   * Implementations probably need to cast the input partition to the 
concrete
+   * {@link InputPartition} class defined for the data source.
+   *
+   * If this method fails (by throwing an exception), the corresponding 
Spark task would fail and
+   * get retried until hitting the maximum retry times.
+   */
+  default PartitionReader 
createColumnarReader(InputPartition partition) {
+throw new UnsupportedOperationException("Cannot create columnar 
reader.");
+  }
+
+  /**
+   * If this method returns true, Spark will call {@link 
#createColumnarReader(InputPartition)} to
+   * create the {@link PartitionReader} and scan the data in a columnar 
way. This means,
+   * implementations must also implement {@link 
#createColumnarReader(InputPartition)} when true
+   * is returned here.
+   */
+  default boolean supportColumnarReads() {
--- End diff --

Can we update this to accept `InputPartition`? That would make it possible 
to use columnar scans for some input splits and row-based scans for others. 
That's helpful when a table has mixed formats, like Hive tables that are 
converted from Sequence to Parquet and have both formats.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22031
  
Build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22031
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1924/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22031
  
**[Test build #94389 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94389/testReport)**
 for PR 22031 at commit 
[`6f91777`](https://github.com/apache/spark/commit/6f91777de93121d668ff11e7701f449bb4c96337).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208343665
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java 
---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for all the batch and streaming read supports. Data 
sources should implement
+ * concrete read support interfaces like {@link BatchReadSupport}.
+ */
+@InterfaceStability.Evolving
+public interface ReadSupport {
+
+  /**
+   * Returns the full schema of this data source, which is usually the 
physical schema of the
+   * underlying storage. This full schema should not be affected by column 
pruning or other
+   * optimizations.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  StructType fullSchema();
+
+  /**
+   * Returns a list of {@link InputPartition}s. Each {@link 
InputPartition} represents a data split
+   * that can be processed by one Spark task. The number of input 
partitions returned here is the
+   * same as the number of RDD partitions this scan outputs.
+   *
+   * Note that, this may not be a full scan if the data source supports 
optimization like filter
+   * push-down. Implementations should check the input {@link ScanConfig} 
and adjust the resulting
+   * {@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  InputPartition[] planInputPartitions(ScanConfig config);
+
+  /**
+   * Returns a factory to produce {@link PartitionReader}s for {@link 
InputPartition}s.
--- End diff --

Minor: Adding 's' after a link isn't good Javadoc style.

For cases like this, it is better to use the singular for both classes to 
communicate the expectation that each `InputPartition` produces a single 
`PartitionReader`.

For cases where you don't need to communicate a one-to-one relationship, 
you can use `{@link InputPartition partitions}` to change the link text.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208344510
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java 
---
@@ -18,22 +18,16 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-
-import java.util.List;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers 
can implement this
- * interface to output {@link Row} instead of {@link InternalRow}.
- * This is an experimental and unstable interface.
+ * An interface that carries query specific information for the data scan. 
Currently it's used to
+ * hold operator pushdown result and streaming offsets. This is defined as 
an empty interface, and
+ * data sources should define their own {@link ScanConfig} classes.
+ *
+ * For APIs that take a {@link ScanConfig} as input, like
+ * {@link ReadSupport#planInputPartitions(ScanConfig)} and
+ * {@link ReadSupport#createReaderFactory(ScanConfig)}, implementations 
mostly need to cast the
+ * input {@link ScanConfig} to the concrete {@link ScanConfig} class of 
the data source.
  */
-@InterfaceStability.Unstable
-public interface SupportsDeprecatedScanRow extends DataSourceReader {
-  default List> planInputPartitions() {
-throw new IllegalStateException(
-"planInputPartitions not supported by default within 
SupportsDeprecatedScanRow");
-  }
-
-  List> planRowInputPartitions();
-}
+@InterfaceStability.Evolving
+public interface ScanConfig {}
--- End diff --

I think this should return the scan's output schema. Otherwise the only way 
to get it is during pushdown.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208344984
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java 
---
@@ -18,22 +18,16 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-
-import java.util.List;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers 
can implement this
- * interface to output {@link Row} instead of {@link InternalRow}.
- * This is an experimental and unstable interface.
+ * An interface that carries query specific information for the data scan. 
Currently it's used to
+ * hold operator pushdown result and streaming offsets. This is defined as 
an empty interface, and
+ * data sources should define their own {@link ScanConfig} classes.
+ *
+ * For APIs that take a {@link ScanConfig} as input, like
+ * {@link ReadSupport#planInputPartitions(ScanConfig)} and
+ * {@link ReadSupport#createReaderFactory(ScanConfig)}, implementations 
mostly need to cast the
+ * input {@link ScanConfig} to the concrete {@link ScanConfig} class of 
the data source.
  */
-@InterfaceStability.Unstable
-public interface SupportsDeprecatedScanRow extends DataSourceReader {
--- End diff --

Can you rebase this on #21921? `SupportsDeprecatedScanRow` should no longer 
be defined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208345467
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java 
---
@@ -18,22 +18,16 @@
 package org.apache.spark.sql.sources.v2.reader;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-
-import java.util.List;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers 
can implement this
- * interface to output {@link Row} instead of {@link InternalRow}.
- * This is an experimental and unstable interface.
+ * An interface that carries query specific information for the data scan. 
Currently it's used to
+ * hold operator pushdown result and streaming offsets. This is defined as 
an empty interface, and
+ * data sources should define their own {@link ScanConfig} classes.
+ *
+ * For APIs that take a {@link ScanConfig} as input, like
+ * {@link ReadSupport#planInputPartitions(ScanConfig)} and
+ * {@link ReadSupport#createReaderFactory(ScanConfig)}, implementations 
mostly need to cast the
+ * input {@link ScanConfig} to the concrete {@link ScanConfig} class of 
the data source.
  */
-@InterfaceStability.Unstable
-public interface SupportsDeprecatedScanRow extends DataSourceReader {
-  default List> planInputPartitions() {
-throw new IllegalStateException(
-"planInputPartitions not supported by default within 
SupportsDeprecatedScanRow");
-  }
-
-  List> planRowInputPartitions();
-}
+@InterfaceStability.Evolving
+public interface ScanConfig {}
--- End diff --

I think this should also report pushed predicates, even if the methods 
default to `new Expression[0]`. Then plan outputs can be based on the scan 
config, not on tracking the results of pushdown in some other object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22027
  
**[Test build #94381 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94381/testReport)**
 for PR 22027 at commit 
[`ddbcc04`](https://github.com/apache/spark/commit/ddbcc04bd6850b388f25faceb2cc4e1943a0f660).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22027
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22027
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94381/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208347697
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
 ---
@@ -21,22 +21,25 @@
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link DataSourceReader}. Data source readers 
can implement this
+ * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this
  * interface to push down required columns to the data source and only 
read these columns during
  * scan to reduce the size of the data to be read.
  */
 @InterfaceStability.Evolving
-public interface SupportsPushDownRequiredColumns extends DataSourceReader {
+public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder 
{
 
   /**
* Applies column pruning w.r.t. the given requiredSchema.
*
* Implementation should try its best to prune the unnecessary columns 
or nested fields, but it's
* also OK to do the pruning partially, e.g., a data source may not be 
able to prune nested
* fields, and only prune top-level columns.
-   *
-   * Note that, data source readers should update {@link 
DataSourceReader#readSchema()} after
-   * applying column pruning.
*/
   void pruneColumns(StructType requiredSchema);
+
+  /**
+   * Returns the schema after the column pruning is applied, so that Spark 
can know if some
+   * columns/nested fields are not pruned.
+   */
+  StructType prunedSchema();
--- End diff --

I don't see a reason to add this. Why not get the final schema from the 
`ScanConfig`? Getting the schema from the `ScanConfig` is better because it is 
clear when the pruned schema will be accessed: after all pushdown methods are 
called.

That matters because filters may cause the source to require more columns 
and the source may choose to return those columns to Spark instead of adding a 
projection. Deferring the projection to Spark is more efficient if Spark was 
going to add one anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208348226
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
 ---
@@ -20,18 +20,18 @@
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A mix in interface for {@link DataSourceReader}. Data source readers 
can implement this
- * interface to report statistics to Spark.
+ * A mix in interface for {@link BatchReadSupport}. Data sources can 
implement this interface to
+ * report statistics to Spark.
  *
- * Statistics are reported to the optimizer before any operator is pushed 
to the DataSourceReader.
- * Implementations that return more accurate statistics based on pushed 
operators will not improve
- * query performance until the planner can push operators before getting 
stats.
+ * Currently statistics are reported to the optimizer before any operator 
is pushed to the data
--- End diff --

Nit: don't use "currently" in docs because it can become out of date and 
cause confusion. Instead, use "as of " to be clear what "currently" 
means.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21669
  
**[Test build #94379 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94379/testReport)**
 for PR 21669 at commit 
[`c30ad8c`](https://github.com/apache/spark/commit/c30ad8c4be1d42e7da4992570a656099c073d745).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21669
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94379/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21669
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...

2018-08-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22008#discussion_r208352309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,45 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Swaps right and left logical plans of a join when left is bigger than 
right. This is useful
+ * because underlying cartesian product performs a nested loop, thus if 
the outer table is
+ * smaller there are less iterator initialization.
--- End diff --

I think this only makes sense when building left iterator and right 
iterator are the same cost. When building right iterator is less costly than 
building left, swapping them might be performance regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...

2018-08-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22008#discussion_r208353576
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -158,8 +158,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
   ConvertToLocalRelation,
   PropagateEmptyRelation) :+
 // The following batch should be executed after batch "Join Reorder" 
and "LocalRelation".
-Batch("Check Cartesian Products", Once,
-  CheckCartesianProducts) :+
+Batch("Check and Optimize Cartesian Products", Once,
+  CheckCartesianProducts,
+  ReorderCrossJoinOperands) :+
--- End diff --

Will reordering here break join order optimized by `CostBasedJoinReorder`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22026: [SPARK-25045][CORE] Make `RDDBarrier.mapParititions` sim...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22026
  
**[Test build #94380 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94380/testReport)**
 for PR 22026 at commit 
[`46be7c4`](https://github.com/apache/spark/commit/46be7c420960a3375d2187ccee1a9fc3d5ef83e6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22026: [SPARK-25045][CORE] Make `RDDBarrier.mapParititions` sim...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22026
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22026: [SPARK-25045][CORE] Make `RDDBarrier.mapParititions` sim...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22026
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94380/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-07 Thread tgravescs
Github user tgravescs commented on the issue:

https://github.com/apache/spark/pull/21698
  
Sorry for coming in late on this, first I saw this was the other day.  

Could someone perhaps summarize the discussions here and exactly when this 
happens and why? Checkpointing was mentioned to work around the issue, why?  
Would be good to add those details to the jira anyway. 

My initial reaction is this is very bad.  Any correctness issue we cause 
from handle failures is not something we should write off and expect the user 
to handle. 
repartition seems to be the most obvious case and I know lots of people use 
it, although hopefully many are using the dataframe api) and we see fetch 
failures on large jobs all the time, so it seems really serious.

Trying to use a similar example as what is listed in jira SPARK-23207 with 
an RDD doesn't reproduce this:

```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = sc.parallelize(0 to (100-1), 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22014
  
**[Test build #94388 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94388/testReport)**
 for PR 22014 at commit 
[`3cfbcfc`](https://github.com/apache/spark/commit/3cfbcfc5fd0099ded2a5bd2b5ff1ef9278135285).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22014
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94388/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22014
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22006
  
**[Test build #94383 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94383/testReport)**
 for PR 22006 at commit 
[`4328199`](https://github.com/apache/spark/commit/4328199fe3738ceec0a2e87b934a20f56e08dc28).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22006
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94383/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22006
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21305
  
**[Test build #94384 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94384/testReport)**
 for PR 21305 at commit 
[`42d86e1`](https://github.com/apache/spark/commit/42d86e1553f345c9879b40b1c20a2addbaf69781).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  s\"its class is $`
  * `case class Uuid(randomSeed: Option[Long] = None) extends 
LeafExpression with Stateful`
  * `case class InSubquery(values: Seq[Expression], query: ListQuery)`
  * `trait ExpressionWithRandomSeed `
  * `case class Rand(child: Expression) extends RDG with 
ExpressionWithRandomSeed `
  * `case class Randn(child: Expression) extends RDG with 
ExpressionWithRandomSeed `
  * `case class AliasIdentifier(identifier: String, database: 
Option[String])`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21305
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21305
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94384/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208368798
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration() {
--- End diff --

Why doesn't this accept a `ScanConfig`? Aren't changes to the source only 
relevant if they affect a scan?

cc @jose-torres.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208370391
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

Why must this be JSON and why must it be a String? Why not byte[] and let 
the implementation choose the representation it prefers?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208370532
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
--- End diff --

Should this be `oldestOffset`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208370493
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration() {
--- End diff --

The motivation for this method is things like Kafka source repartitioning. 
If a topic gets partitions added to it (or a subscription pattern gets topics 
added to it), Spark needs to schedule a new job which will scan the new 
partitions/topics, even though the Spark-side scan hasn't changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/22014
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22014
  
**[Test build #94390 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94390/testReport)**
 for PR 22014 at commit 
[`3cfbcfc`](https://github.com/apache/spark/commit/3cfbcfc5fd0099ded2a5bd2b5ff1ef9278135285).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21180: [SPARK-22674][PYTHON] Disabled _hack_namedtuple for pick...

2018-08-07 Thread superbobry
Github user superbobry commented on the issue:

https://github.com/apache/spark/pull/21180
  
Sorry to bug you @HyukjinKwon, but I would really like for this patch to 
make it into the next PySpark release. Would you have time in the following 
weeks to have another look at this? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208371927
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
--- End diff --

Shouldn't this be `createContinuousReaderFactory`? If the method is the 
same across `BatchReadSupport`, `MicroBatchReadSupport`, and 
`ContinuousReadSupport`, then implementing both batch and continuous would 
require a factory that always returns both continuous and batch readers. 
Separate methods would allow each implementation to use a base class and add 
continuous or micro-batch support to different classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22014
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208372512
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * micro-batch mode.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
MicroBatchReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
+
+  /**
+   * Returns the most recent offset available.
+   */
+  Offset latestOffset(Offset start);
--- End diff --

Why does this accept a starting offset?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22014
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1925/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208372469
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

The offsets are ultimately exposed as JSON inside the JSON representation 
of StreamingQueryProgress. It's important for visibility and debuggability that 
progress events contain human-readable representations of offsets.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...

2018-08-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22008#discussion_r208372709
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -158,8 +158,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
   ConvertToLocalRelation,
   PropagateEmptyRelation) :+
 // The following batch should be executed after batch "Join Reorder" 
and "LocalRelation".
-Batch("Check Cartesian Products", Once,
-  CheckCartesianProducts) :+
+Batch("Check and Optimize Cartesian Products", Once,
+  CheckCartesianProducts,
+  ReorderCrossJoinOperands) :+
--- End diff --

This doesn't reorder the joins, but it just swaps the sides of a cartesian 
join, so there is no difference neither in the join order nor in any other 
aspect of what is the plan before or after it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208373089
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
+
+  /**
+   * Informs the source that Spark has completed processing all data for 
offsets less than or
+   * equal to `end` and will only request offsets greater than `end` in 
the future.
+   */
+  void commit(Offset end);
--- End diff --

I think this should accept a `ScanConfig`. The read support is general and 
can create multiple scans. It should not keep state about any one scan. That's 
something the `ScanConfig` should do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208373424
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
--- End diff --

Streaming-centric sources won't always have the initial offset be the 
oldest offset. In the Kafka source, for instance, the default is actually to 
start from the newest offset.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208373784
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
--- End diff --

This would also match the write side: this commit adds 
`createBatchWriterFactory`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22030: [SPARK-25048][SQL] Pivoting by multiple columns in Scala...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22030
  
**[Test build #94385 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94385/testReport)**
 for PR 22030 at commit 
[`90fc82b`](https://github.com/apache/spark/commit/90fc82b59ae50e6a2a1548a0756a40c6325354ec).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22030: [SPARK-25048][SQL] Pivoting by multiple columns in Scala...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22030
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94385/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22031
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22030: [SPARK-25048][SQL] Pivoting by multiple columns in Scala...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22030
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22031
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1926/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22031
  
**[Test build #94391 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94391/testReport)**
 for PR 22031 at commit 
[`14ef371`](https://github.com/apache/spark/commit/14ef371bdef36df170ec3e487598514ef4967e0e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21977
  
test cases?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...

2018-08-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22008#discussion_r208379788
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,45 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Swaps right and left logical plans of a join when left is bigger than 
right. This is useful
+ * because underlying cartesian product performs a nested loop, thus if 
the outer table is
+ * smaller there are less iterator initialization.
--- End diff --

This is indeed an interesting point. I am not sure how/if we can measure 
the cost in the creation of the involved iterator and the cost of creating it.

Anyway, actually this will optimize not only the initialization cost for 
the iterator, but also the overall number of record read/processed. Let's take 
an example. Imagine that we have a table A with 10M record and a table B with 
100 records. The total number of record retrieved is:

 - if A is the left table, we process: 10M (all the records from A) + 100 * 
10M (all the records from B for every record from A) = 101 * 10M
 - if B is the left table, we process: 100 (all the records from B) + 100 * 
10M (all the records from A for each record from B) = ~ 100 * 10M

So in the second case we process size of A - size B less records (same 
applies to number of bytes read).

And there is another good point for the second option: ie. Spark is much 
better at computing/reading 10 times 10M records that 10M times 2 records as it 
can exploits its parallelism.

That said, your comment still applies, ie. there may be cases in which one 
side is very onerous despite is the one with less records involved. Do you have 
any suggestion about how to estimate this? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208380139
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 ---
@@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
 valuePrepared
   }
 
-  override def next(): T = {
+  override def next(): Any = {
 if (!hasNext) {
   throw new java.util.NoSuchElementException("End of stream")
 }
 valuePrepared = false
 reader.get()
   }
 }
-new InterruptibleIterator(context, iter)
+// TODO: get rid of this type hack.
+new InterruptibleIterator(context, 
iter.asInstanceOf[Iterator[InternalRow]])
--- End diff --

Why is this necessary? I think the TODO should be handled in this commit 
and that Spark shouldn't cast RDD[ColumnarBatch] to RDD[InternalRow].

What about having the RDD iterate over the rows in the batch to actually 
implement the interface? It can provide the underlying batches through a 
different API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208380370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 ---
@@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag](
 valuePrepared
   }
 
-  override def next(): T = {
+  override def next(): Any = {
 if (!hasNext) {
   throw new java.util.NoSuchElementException("End of stream")
 }
 valuePrepared = false
 reader.get()
   }
 }
-new InterruptibleIterator(context, iter)
+// TODO: get rid of this type hack.
+new InterruptibleIterator(context, 
iter.asInstanceOf[Iterator[InternalRow]])
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-
split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
+
split.asInstanceOf[DataSourceRDDPartition].inputPartition.preferredLocations()
--- End diff --

Why doesn't this use `match` to check locality and default to no locality?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208383098
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -39,52 +36,43 @@ case class DataSourceV2ScanExec(
 @transient source: DataSourceV2,
 @transient options: Map[String, String],
 @transient pushedFilters: Seq[Expression],
-@transient reader: DataSourceReader)
+@transient readSupport: ReadSupport,
+@transient scanConfig: ScanConfig)
   extends LeafExecNode with DataSourceV2StringFormat with 
ColumnarBatchScan {
 
   override def simpleString: String = "ScanV2 " + metadataString
 
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
 case other: DataSourceV2ScanExec =>
-  output == other.output && reader.getClass == other.reader.getClass 
&& options == other.options
+  output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
+options == other.options
 case _ => false
   }
 
   override def hashCode(): Int = {
 Seq(output, source, options).hashCode()
   }
 
-  override def outputPartitioning: physical.Partitioning = reader match {
-case r: SupportsScanColumnarBatch if r.enableBatchRead() && 
batchPartitions.size == 1 =>
-  SinglePartition
-
-case r: SupportsScanColumnarBatch if !r.enableBatchRead() && 
partitions.size == 1 =>
-  SinglePartition
-
-case r if !r.isInstanceOf[SupportsScanColumnarBatch] && 
partitions.size == 1 =>
+  override def outputPartitioning: physical.Partitioning = readSupport 
match {
+case _ if partitions.length == 1 =>
   SinglePartition
 
 case s: SupportsReportPartitioning =>
   new DataSourcePartitioning(
-s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
+s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a 
-> a.name)))
 
 case _ => super.outputPartitioning
   }
 
-  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
-reader.planInputPartitions().asScala
-  }
+  private lazy val partitions: Seq[InputPartition] = 
readSupport.planInputPartitions(scanConfig)
 
-  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = 
reader match {
-case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-  assert(!reader.isInstanceOf[ContinuousReader],
-"continuous stream reader does not support columnar read yet.")
-  r.planBatchInputPartitions().asScala
-  }
+  private lazy val partitionReaderFactory = 
readSupport.createReaderFactory(scanConfig)
 
-  private lazy val inputRDD: RDD[InternalRow] = reader match {
-case _: ContinuousReader =>
+  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
+case _: ContinuousReadSupport =>
+  assert(!partitionReaderFactory.supportColumnarReads(),
--- End diff --

Can't Spark choose to use InternalRow reads instead? Why can't the source 
support both?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208383579
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
@@ -93,21 +81,17 @@ case class DataSourceV2ScanExec(
 sparkContext,
 sqlContext.conf.continuousStreamingExecutorQueueSize,
 sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-partitions).asInstanceOf[RDD[InternalRow]]
-
-case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-  new DataSourceRDD(sparkContext, 
batchPartitions).asInstanceOf[RDD[InternalRow]]
+partitions,
+schema,
+
partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory])
--- End diff --

This should not cast. Just call 
`readSupport.createContinuousReaderFactory(...)` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208384141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -80,17 +80,17 @@ object DataSourceV2Strategy extends Strategy {
*/
   // TODO: nested column pruning.
   private def pruneColumns(
-  reader: DataSourceReader,
+  configBuilder: ScanConfigBuilder,
   relation: DataSourceV2Relation,
   exprs: Seq[Expression]): Seq[AttributeReference] = {
-reader match {
+configBuilder match {
   case r: SupportsPushDownRequiredColumns =>
 val requiredColumns = AttributeSet(exprs.flatMap(_.references))
 val neededOutput = relation.output.filter(requiredColumns.contains)
 if (neededOutput != relation.output) {
   r.pruneColumns(neededOutput.toStructType)
   val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
-  r.readSchema().toAttributes.map {
+  r.prunedSchema().toAttributes.map {
--- End diff --

As I noted earlier, this shouldn't get the scan's schema until the scan is 
fully configured.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...

2018-08-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22008#discussion_r208385422
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,45 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Swaps right and left logical plans of a join when left is bigger than 
right. This is useful
+ * because underlying cartesian product performs a nested loop, thus if 
the outer table is
+ * smaller there are less iterator initialization.
--- End diff --

I have no idea that we have a good way so far to estimate the effort of 
materializing elements in one RDD, especially before we materialize it. That is 
why I think this optimization of swapping cross join doesn't always introduce 
improvement but sometimes regression. Let us see if others have more ideas.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21977
  
@gatorsmile, I started 
[YarnPySparkSuite](https://gist.github.com/rdblue/9848a00f49eaad6126fbbcfa1b039e19)
 but the YARN tests don't create python worker processes so the tests don't 
work. I need to find out how to force YARN to create workers in order to write 
tests. If you have any input that would help, please let me know.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21977
  
**[Test build #94386 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94386/testReport)**
 for PR 21977 at commit 
[`ee750ef`](https://github.com/apache/spark/commit/ee750efae806ea958a6a5a327799dafe6a0b3e64).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21977
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread crafty-coder
Github user crafty-coder commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208387111
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,93 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform elements in an array using the transform function. This is 
similar to
+ * a `map` in functional programming.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Merges the two given arrays, element-wise, 
into a single array using function. If one array is shorter, nulls are appended 
at the end to match the length of the longer array, before applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), x -> x + 1);
--- End diff --

The examples are not accurate.

You could something like:

```
 > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x));   

  array(('a', 1), ('b', 3), ('c', 5))   

 > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));   

  array(4, 6)   

 > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> 
concat(x, y));   
  array('ad', 'be', 'cf')   

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21977
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94386/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...

2018-08-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21733
  
@tdas 
Done running perf. test with 4 more tests:

> BenchmarkMovingAggregationsListenerKeyMuchBigger

rate: 16

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 159877.232 | 149537.817 | 65000 | 133511303 |
| patch (on top of c9914cf) | 160049.118 | 152497.945 | 65000 | 73236351 |

state size: 54.854 % (reduces 45.15%)

> BenchmarkMovingAggregationsListenerManyKeys

rate: 12

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 120266.810 | 107482.042 | 65000 | 38433719 |
| patch (on top of c9914cf) | 119865.855 | 109268.772 | 65000 | 24900343 |

state size: 64.787% (reduces 35.21%)

> BenchmarkMovingAggregationsListenerManyValues

rate: 25000

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 25009.236 | 21216.126 | 9 | 77161711 
(857.352 per row) |
| patch (on top of c9914cf) | 25060.635 | 20774.500 | 99495 | 78230335 
(786.274 per row) |

state size: 91.709 % (reduces 8.29 %)

> BenchmarkMovingAggregationsListenerValueMuchBigger

rate: 85000

version | input rows per second | processed rows per second | total state 
rows | used bytes of current state version
 |  |  |  | 
| latest master (c9914cf) | 85310.774 | 79091.271 | 1000 | 1324255 |
| patch (on top of c9914cf) | 84791.761 | 79755.905 | 1000 | 1282687 |

state size: 96.861 % (reduces 3.14 %)

I don't find any outstanding perf. hit, and expected state size reduction 
is shown from all over the cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208389947
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration() {
--- End diff --

I think of a ReadSupport as something that can be read or scanned and 
ContinuousReadSupport as a stream that can be read. In that abstraction, the 
"something that can be read" probably isn't the right place to track whether a 
particular scan requires reconfiguration: a *scan* requires reconfiguration if 
that scan is based on partitions that are out of date.

To me, that indicates that a Kafka `ScanConfig` should keep track of kafka 
partitions and then `needsReconfiguration` should return true if the Kafka 
topic now has a different set of partitions than the ones in the `ScanConfig`. 
Does that make sense?

I think it would also be more consistent in the API to add `ScanConfig` 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208390264
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

If Spark uses JSON to serialize, why can't Spark handle deserialization 
itself? 

Why not require `Offset` to have a human-readable `toString` and a 
`toBytes` for serialization? We don't have to conflate serialization with human 
readability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208390359
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
--- End diff --

I was thinking oldest available, but it's a minor point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22028
  
**[Test build #94387 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94387/testReport)**
 for PR 22028 at commit 
[`de44d0d`](https://github.com/apache/spark/commit/de44d0df8053c5caececee6b0f625d69b884b8d9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208391449
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.ScanConfig;
+import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
+
+/**
+ * An interface which defines how to scan the data from data source for 
streaming processing with
+ * continuous mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
+
+  /**
+   * Returns a builder of {@link ScanConfig}. The builder can take some 
query specific information
+   * like which operators to pushdown, streaming offsets, etc., and keep 
these information in the
+   * created {@link ScanConfig}.
+   *
+   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
+   * needs to take {@link ScanConfig} as an input.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  ScanConfigBuilder newScanConfigBuilder(Offset start);
+
+  /**
+   * Returns a factory to produce {@link ContinuousPartitionReader}s for 
{@link InputPartition}s.
+   *
+   * If this method fails (by throwing an exception), the action will fail 
and no Spark job will be
+   * submitted.
+   */
+  @Override
+  ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config);
+
+  /**
+   * Merge partitioned offsets coming from {@link 
ContinuousPartitionReader} instances
+   * for each partition to a single global offset.
+   */
+  Offset mergeOffsets(PartitionOffset[] offsets);
+
+  /**
+   * The execution engine will call this method in every epoch to 
determine if new input
+   * partitions need to be generated, which may be required if for example 
the underlying
+   * source system has had partitions added or removed.
+   *
+   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
+   * instance.
+   */
+  default boolean needsReconfiguration() {
--- End diff --

Makes sense to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22028
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22028
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94387/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API

2018-08-07 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/22009#discussion_r208392865
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.sql.sources.v2.reader.ReadSupport;
+
+/**
+ * A base interface for streaming read support. This is package private 
and is invisible to data
+ * sources. Data sources should implement concrete streaming read support 
interfaces:
+ * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
+ */
+interface StreamingReadSupport extends ReadSupport {
+
+  /**
+   * Returns the initial offset for a streaming query to start reading 
from. Note that the
+   * streaming data source should not assume that it will start reading 
from its
+   * {@link #initialOffset()} value: if Spark is restarting an existing 
query, it will restart from
+   * the check-pointed offset rather than the initial one.
+   */
+  Offset initialOffset();
+
+  /**
+   * Deserialize a JSON string into an Offset of the 
implementation-defined offset type.
+   *
+   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+   */
+  Offset deserializeOffset(String json);
--- End diff --

Currently, there are two representations of any given offset: a 
connector-defined JVM object and a serialized JSON string.

Spark can't build the JVM object itself because it doesn't know what the 
right type is. If you know of some clean way for a connector to declare "here 
is the type of my offsets", we should do that instead, but I only know how to 
do it through reflection magic more confusing than the status quo.

I'd hesitate to introduce a third representation unless there's some 
concrete use case where JSON serialization won't work well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21977
  
@rdblue Is this for YARN only?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22031
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21977
  
cc @jiangxb1987 @cloud-fan @jerryshao @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22031
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1927/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22031
  
**[Test build #94392 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94392/testReport)**
 for PR 22031 at commit 
[`c7e2dee`](https://github.com/apache/spark/commit/c7e2dee7cf48efd28d764fdd543bf366d65fcfa5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22017
  
**[Test build #94393 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94393/testReport)**
 for PR 22017 at commit 
[`89a3da4`](https://github.com/apache/spark/commit/89a3da4e292690b78fbb41deef4104be3f843c1b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22017
  
**[Test build #94394 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94394/testReport)**
 for PR 22017 at commit 
[`12ad8b2`](https://github.com/apache/spark/commit/12ad8b2248b7acb4a04289ca8da439ecb63206a9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/22028
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22028: [SPARK-25046][SQL] Fix Alter View can excute sql ...

2018-08-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22028


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208399620
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+val (rightElementType, rightContainsNull) = right.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+copy(function = f(function,
+  (leftElementType, leftContainsNull) :: (rightElementType, 
rightContainsNull) :: Nil))
+  }
+
+  @transient lazy val (arr1Var, arr2Var) = {
+val LambdaFunction(_,
+  (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: 
Nil, _) = function
+(arr1Var, arr2Var)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val leftArr = left.eval(input).asInstanceOf[ArrayData]
+val rightArr = right.eval(input).asInstanceOf[ArrayData]
+
+if (leftArr == null || rightArr == null) {
--- End diff --

If ```leftArr``` is ```null```, ```right``` doesn't have to be evaluated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208403145
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+val (rightElementType, rightContainsNull) = right.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+copy(function = f(function,
+  (leftElementType, leftContainsNull) :: (rightElementType, 
rightContainsNull) :: Nil))
--- End diff --

If you want to support different size of input arrays (The jira ticket 
says: _"Both arrays must be the same length."_), what about the scenario when 
one array is empty and the second has elements? Shouldn't we use ```true``` 
instead of  ```leftContainsNull``` and ```rightContainsNull```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208398313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
--- End diff --

You can utilize ```HigherOrderFunction.arrayArgumentType```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22004: [SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSerializa...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22004
  
**[Test build #4235 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4235/testReport)**
 for PR 22004 at commit 
[`422c4ab`](https://github.com/apache/spark/commit/422c4ab259b5e27ef12c2d5093a4ae93f2b7f522).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22027
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22027
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22027
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1928/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...

2018-08-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22027
  
**[Test build #94395 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94395/testReport)**
 for PR 22027 at commit 
[`ddbcc04`](https://github.com/apache/spark/commit/ddbcc04bd6850b388f25faceb2cc4e1943a0f660).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22004: [SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSerializa...

2018-08-07 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/22004
  
Merged to master, but the janino issue is still outstanding


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22014: [SPARK-25036][SQL] avoid match may not be exhaust...

2018-08-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22014#discussion_r208406147
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -87,7 +87,7 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
   // For top level row writer, it always writes to the beginning of 
the global buffer holder,
   // which means its fixed-size region always in the same position, so 
we don't need to call
   // `reset` to set up its fixed-size region every time.
-  if (inputs.map(_.isNull).forall(_ == "false")) {
--- End diff --

@kiszk  was this intentional?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22004: [SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSe...

2018-08-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22004


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-07 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21977
  
Yes, this is for YARN only. I've also opened follow-up issues for Mesos and 
Kubernetes integration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r208408858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 ---
@@ -49,4 +51,11 @@ object DataSourceUtils {
   }
 }
   }
+
+  // SPARK-15895: Metadata files (e.g. Parquet summary files) and 
temporary files should not be
+  // counted as data files, so that they shouldn't participate partition 
discovery.
+  private[sql] def isDataPath(path: Path): Boolean = {
+val name = path.getName
+!((name.startsWith("_") && !name.contains("=")) || 
name.startsWith("."))
--- End diff --

Not sure what is your earlier impl. I would prefer to keeping unchanged the 
original code in `PartitioningAwareFileIndex.scala`. Just add a utility 
function `isDataPath ` in CommandUtils.scala. Does this sound good to you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...

2018-08-07 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22030#discussion_r208410422
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -384,6 +392,10 @@ class RelationalGroupedDataset protected[sql](
   .sort(pivotColumn)  // ensure that the output columns are in a 
consistent logical order
   .collect()
   .map(_.get(0))
+  .collect {
+case row: GenericRow => struct(row.values.map(lit): _*)
--- End diff --

I suspect this will not work for nested struct types, or say, multiple 
pivot columns with nested type. Could you please add a test like:
```
  test("pivoting column list") {
val expected = ...
val df = trainingSales
  .groupBy($"sales.year")
  .pivot(struct($"sales", $"training"))
  .agg(sum($"sales.earnings"))
 checkAnswer(df, expected)
  }
```
And can we also check if it works for other complex nested types, like 
Array(Struct(...))?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...

2018-08-07 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/22030#discussion_r208411022
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -384,6 +392,10 @@ class RelationalGroupedDataset protected[sql](
   .sort(pivotColumn)  // ensure that the output columns are in a 
consistent logical order
   .collect()
   .map(_.get(0))
+  .collect {
--- End diff --

Use "map"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version

2018-08-07 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21596
  
@jerryshao This is for 3.0


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   >