[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208342404 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionReaderFactory.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import java.io.Serializable; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * A factory of {@link PartitionReader}s. Implementations can do either row-based scan or columnar + * scan, by switching the {@link #supportColumnarReads()} flag. + */ +@InterfaceStability.Evolving +public interface PartitionReaderFactory extends Serializable { + + /** + * Returns a row-based partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + PartitionReader createReader(InputPartition partition); + + /** + * Returns a columnar partition reader to read data from the given {@link InputPartition}. + * + * Implementations probably need to cast the input partition to the concrete + * {@link InputPartition} class defined for the data source. + * + * If this method fails (by throwing an exception), the corresponding Spark task would fail and + * get retried until hitting the maximum retry times. + */ + default PartitionReader createColumnarReader(InputPartition partition) { +throw new UnsupportedOperationException("Cannot create columnar reader."); + } + + /** + * If this method returns true, Spark will call {@link #createColumnarReader(InputPartition)} to + * create the {@link PartitionReader} and scan the data in a columnar way. This means, + * implementations must also implement {@link #createColumnarReader(InputPartition)} when true + * is returned here. + */ + default boolean supportColumnarReads() { --- End diff -- Can we update this to accept `InputPartition`? That would make it possible to use columnar scans for some input splits and row-based scans for others. That's helpful when a table has mixed formats, like Hive tables that are converted from Sequence to Parquet and have both formats. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22031 Build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22031 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1924/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22031 **[Test build #94389 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94389/testReport)** for PR 22031 at commit [`6f91777`](https://github.com/apache/spark/commit/6f91777de93121d668ff11e7701f449bb4c96337). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208343665 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadSupport.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.types.StructType; + +/** + * The base interface for all the batch and streaming read supports. Data sources should implement + * concrete read support interfaces like {@link BatchReadSupport}. + */ +@InterfaceStability.Evolving +public interface ReadSupport { + + /** + * Returns the full schema of this data source, which is usually the physical schema of the + * underlying storage. This full schema should not be affected by column pruning or other + * optimizations. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + StructType fullSchema(); + + /** + * Returns a list of {@link InputPartition}s. Each {@link InputPartition} represents a data split + * that can be processed by one Spark task. The number of input partitions returned here is the + * same as the number of RDD partitions this scan outputs. + * + * Note that, this may not be a full scan if the data source supports optimization like filter + * push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting + * {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + InputPartition[] planInputPartitions(ScanConfig config); + + /** + * Returns a factory to produce {@link PartitionReader}s for {@link InputPartition}s. --- End diff -- Minor: Adding 's' after a link isn't good Javadoc style. For cases like this, it is better to use the singular for both classes to communicate the expectation that each `InputPartition` produces a single `PartitionReader`. For cases where you don't need to communicate a one-to-one relationship, you can use `{@link InputPartition partitions}` to change the link text. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208344510 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java --- @@ -18,22 +18,16 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; - -import java.util.List; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to output {@link Row} instead of {@link InternalRow}. - * This is an experimental and unstable interface. + * An interface that carries query specific information for the data scan. Currently it's used to + * hold operator pushdown result and streaming offsets. This is defined as an empty interface, and + * data sources should define their own {@link ScanConfig} classes. + * + * For APIs that take a {@link ScanConfig} as input, like + * {@link ReadSupport#planInputPartitions(ScanConfig)} and + * {@link ReadSupport#createReaderFactory(ScanConfig)}, implementations mostly need to cast the + * input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. */ -@InterfaceStability.Unstable -public interface SupportsDeprecatedScanRow extends DataSourceReader { - default List> planInputPartitions() { -throw new IllegalStateException( -"planInputPartitions not supported by default within SupportsDeprecatedScanRow"); - } - - List> planRowInputPartitions(); -} +@InterfaceStability.Evolving +public interface ScanConfig {} --- End diff -- I think this should return the scan's output schema. Otherwise the only way to get it is during pushdown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208344984 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java --- @@ -18,22 +18,16 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; - -import java.util.List; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to output {@link Row} instead of {@link InternalRow}. - * This is an experimental and unstable interface. + * An interface that carries query specific information for the data scan. Currently it's used to + * hold operator pushdown result and streaming offsets. This is defined as an empty interface, and + * data sources should define their own {@link ScanConfig} classes. + * + * For APIs that take a {@link ScanConfig} as input, like + * {@link ReadSupport#planInputPartitions(ScanConfig)} and + * {@link ReadSupport#createReaderFactory(ScanConfig)}, implementations mostly need to cast the + * input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. */ -@InterfaceStability.Unstable -public interface SupportsDeprecatedScanRow extends DataSourceReader { --- End diff -- Can you rebase this on #21921? `SupportsDeprecatedScanRow` should no longer be defined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208345467 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java --- @@ -18,22 +18,16 @@ package org.apache.spark.sql.sources.v2.reader; import org.apache.spark.annotation.InterfaceStability; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; - -import java.util.List; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to output {@link Row} instead of {@link InternalRow}. - * This is an experimental and unstable interface. + * An interface that carries query specific information for the data scan. Currently it's used to + * hold operator pushdown result and streaming offsets. This is defined as an empty interface, and + * data sources should define their own {@link ScanConfig} classes. + * + * For APIs that take a {@link ScanConfig} as input, like + * {@link ReadSupport#planInputPartitions(ScanConfig)} and + * {@link ReadSupport#createReaderFactory(ScanConfig)}, implementations mostly need to cast the + * input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source. */ -@InterfaceStability.Unstable -public interface SupportsDeprecatedScanRow extends DataSourceReader { - default List> planInputPartitions() { -throw new IllegalStateException( -"planInputPartitions not supported by default within SupportsDeprecatedScanRow"); - } - - List> planRowInputPartitions(); -} +@InterfaceStability.Evolving +public interface ScanConfig {} --- End diff -- I think this should also report pushed predicates, even if the methods default to `new Expression[0]`. Then plan outputs can be based on the scan config, not on tracking the results of pushdown in some other object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22027 **[Test build #94381 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94381/testReport)** for PR 22027 at commit [`ddbcc04`](https://github.com/apache/spark/commit/ddbcc04bd6850b388f25faceb2cc4e1943a0f660). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22027 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22027 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94381/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208347697 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java --- @@ -21,22 +21,25 @@ import org.apache.spark.sql.types.StructType; /** - * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this + * A mix-in interface for {@link ScanConfigBuilder}. Data sources can implement this * interface to push down required columns to the data source and only read these columns during * scan to reduce the size of the data to be read. */ @InterfaceStability.Evolving -public interface SupportsPushDownRequiredColumns extends DataSourceReader { +public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder { /** * Applies column pruning w.r.t. the given requiredSchema. * * Implementation should try its best to prune the unnecessary columns or nested fields, but it's * also OK to do the pruning partially, e.g., a data source may not be able to prune nested * fields, and only prune top-level columns. - * - * Note that, data source readers should update {@link DataSourceReader#readSchema()} after - * applying column pruning. */ void pruneColumns(StructType requiredSchema); + + /** + * Returns the schema after the column pruning is applied, so that Spark can know if some + * columns/nested fields are not pruned. + */ + StructType prunedSchema(); --- End diff -- I don't see a reason to add this. Why not get the final schema from the `ScanConfig`? Getting the schema from the `ScanConfig` is better because it is clear when the pruned schema will be accessed: after all pushdown methods are called. That matters because filters may cause the source to require more columns and the source may choose to return those columns to Spark instead of adding a projection. Deferring the projection to Spark is more efficient if Spark was going to add one anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208348226 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java --- @@ -20,18 +20,18 @@ import org.apache.spark.annotation.InterfaceStability; /** - * A mix in interface for {@link DataSourceReader}. Data source readers can implement this - * interface to report statistics to Spark. + * A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to + * report statistics to Spark. * - * Statistics are reported to the optimizer before any operator is pushed to the DataSourceReader. - * Implementations that return more accurate statistics based on pushed operators will not improve - * query performance until the planner can push operators before getting stats. + * Currently statistics are reported to the optimizer before any operator is pushed to the data --- End diff -- Nit: don't use "currently" in docs because it can become out of date and cause confusion. Instead, use "as of " to be clear what "currently" means. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21669 **[Test build #94379 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94379/testReport)** for PR 21669 at commit [`c30ad8c`](https://github.com/apache/spark/commit/c30ad8c4be1d42e7da4992570a656099c073d745). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21669 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94379/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21669: [SPARK-23257][K8S][WIP] Kerberos Support for Spark on K8...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21669 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22008#discussion_r208352309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +153,45 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * Swaps right and left logical plans of a join when left is bigger than right. This is useful + * because underlying cartesian product performs a nested loop, thus if the outer table is + * smaller there are less iterator initialization. --- End diff -- I think this only makes sense when building left iterator and right iterator are the same cost. When building right iterator is less costly than building left, swapping them might be performance regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22008#discussion_r208353576 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -158,8 +158,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ConvertToLocalRelation, PropagateEmptyRelation) :+ // The following batch should be executed after batch "Join Reorder" and "LocalRelation". -Batch("Check Cartesian Products", Once, - CheckCartesianProducts) :+ +Batch("Check and Optimize Cartesian Products", Once, + CheckCartesianProducts, + ReorderCrossJoinOperands) :+ --- End diff -- Will reordering here break join order optimized by `CostBasedJoinReorder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22026: [SPARK-25045][CORE] Make `RDDBarrier.mapParititions` sim...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22026 **[Test build #94380 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94380/testReport)** for PR 22026 at commit [`46be7c4`](https://github.com/apache/spark/commit/46be7c420960a3375d2187ccee1a9fc3d5ef83e6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22026: [SPARK-25045][CORE] Make `RDDBarrier.mapParititions` sim...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22026 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22026: [SPARK-25045][CORE] Make `RDDBarrier.mapParititions` sim...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22026 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94380/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user tgravescs commented on the issue: https://github.com/apache/spark/pull/21698 Sorry for coming in late on this, first I saw this was the other day. Could someone perhaps summarize the discussions here and exactly when this happens and why? Checkpointing was mentioned to work around the issue, why? Would be good to add those details to the jira anyway. My initial reaction is this is very bad. Any correctness issue we cause from handle failures is not something we should write off and expect the user to handle. repartition seems to be the most obvious case and I know lots of people use it, although hopefully many are using the dataframe api) and we see fetch failures on large jobs all the time, so it seems really serious. Trying to use a similar example as what is listed in jira SPARK-23207 with an RDD doesn't reproduce this: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = sc.parallelize(0 to (100-1), 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22014 **[Test build #94388 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94388/testReport)** for PR 22014 at commit [`3cfbcfc`](https://github.com/apache/spark/commit/3cfbcfc5fd0099ded2a5bd2b5ff1ef9278135285). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22014 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94388/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22014 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22006 **[Test build #94383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94383/testReport)** for PR 22006 at commit [`4328199`](https://github.com/apache/spark/commit/4328199fe3738ceec0a2e87b934a20f56e08dc28). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22006 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94383/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22006 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21305 **[Test build #94384 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94384/testReport)** for PR 21305 at commit [`42d86e1`](https://github.com/apache/spark/commit/42d86e1553f345c9879b40b1c20a2addbaf69781). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` s\"its class is $` * `case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with Stateful` * `case class InSubquery(values: Seq[Expression], query: ListQuery)` * `trait ExpressionWithRandomSeed ` * `case class Rand(child: Expression) extends RDG with ExpressionWithRandomSeed ` * `case class Randn(child: Expression) extends RDG with ExpressionWithRandomSeed ` * `case class AliasIdentifier(identifier: String, database: Option[String])` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21305 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21305 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94384/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208368798 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} + * instance. + */ + default boolean needsReconfiguration() { --- End diff -- Why doesn't this accept a `ScanConfig`? Aren't changes to the source only relevant if they affect a scan? cc @jose-torres. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208370391 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- Why must this be JSON and why must it be a String? Why not byte[] and let the implementation choose the representation it prefers? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208370532 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the --- End diff -- Should this be `oldestOffset`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208370493 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} + * instance. + */ + default boolean needsReconfiguration() { --- End diff -- The motivation for this method is things like Kafka source repartitioning. If a topic gets partitions added to it (or a subscription pattern gets topics added to it), Spark needs to schedule a new job which will scan the new partitions/topics, even though the Spark-side scan hasn't changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/22014 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22014 **[Test build #94390 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94390/testReport)** for PR 22014 at commit [`3cfbcfc`](https://github.com/apache/spark/commit/3cfbcfc5fd0099ded2a5bd2b5ff1ef9278135285). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21180: [SPARK-22674][PYTHON] Disabled _hack_namedtuple for pick...
Github user superbobry commented on the issue: https://github.com/apache/spark/pull/21180 Sorry to bug you @HyukjinKwon, but I would really like for this patch to make it into the next PySpark release. Would you have time in the following weeks to have another look at this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208371927 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); --- End diff -- Shouldn't this be `createContinuousReaderFactory`? If the method is the same across `BatchReadSupport`, `MicroBatchReadSupport`, and `ContinuousReadSupport`, then implementing both batch and continuous would require a factory that always returns both continuous and batch readers. Separate methods would allow each implementation to use a base class and add continuous or micro-batch support to different classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22014 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208372512 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * micro-batch mode. + */ +@InterfaceStability.Evolving +public interface MicroBatchReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link MicroBatchReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end); + + /** + * Returns the most recent offset available. + */ + Offset latestOffset(Offset start); --- End diff -- Why does this accept a starting offset? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22014: [SPARK-25036][SQL] avoid match may not be exhaustive in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22014 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1925/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208372469 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- The offsets are ultimately exposed as JSON inside the JSON representation of StreamingQueryProgress. It's important for visibility and debuggability that progress events contain human-readable representations of offsets. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22008#discussion_r208372709 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -158,8 +158,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ConvertToLocalRelation, PropagateEmptyRelation) :+ // The following batch should be executed after batch "Join Reorder" and "LocalRelation". -Batch("Check Cartesian Products", Once, - CheckCartesianProducts) :+ +Batch("Check and Optimize Cartesian Products", Once, + CheckCartesianProducts, + ReorderCrossJoinOperands) :+ --- End diff -- This doesn't reorder the joins, but it just swaps the sides of a cartesian join, so there is no difference neither in the join order nor in any other aspect of what is the plan before or after it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208373089 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); + + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + void commit(Offset end); --- End diff -- I think this should accept a `ScanConfig`. The read support is general and can create multiple scans. It should not keep state about any one scan. That's something the `ScanConfig` should do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208373424 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the --- End diff -- Streaming-centric sources won't always have the initial offset be the oldest offset. In the Kafka source, for instance, the default is actually to start from the newest offset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208373784 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); --- End diff -- This would also match the write side: this commit adds `createBatchWriterFactory`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22030: [SPARK-25048][SQL] Pivoting by multiple columns in Scala...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22030 **[Test build #94385 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94385/testReport)** for PR 22030 at commit [`90fc82b`](https://github.com/apache/spark/commit/90fc82b59ae50e6a2a1548a0756a40c6325354ec). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22030: [SPARK-25048][SQL] Pivoting by multiple columns in Scala...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22030 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94385/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22031 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22030: [SPARK-25048][SQL] Pivoting by multiple columns in Scala...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22030 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22031 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1926/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22031 **[Test build #94391 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94391/testReport)** for PR 22031 at commit [`14ef371`](https://github.com/apache/spark/commit/14ef371bdef36df170ec3e487598514ef4967e0e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21977 test cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22008#discussion_r208379788 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +153,45 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * Swaps right and left logical plans of a join when left is bigger than right. This is useful + * because underlying cartesian product performs a nested loop, thus if the outer table is + * smaller there are less iterator initialization. --- End diff -- This is indeed an interesting point. I am not sure how/if we can measure the cost in the creation of the involved iterator and the cost of creating it. Anyway, actually this will optimize not only the initialization cost for the iterator, but also the overall number of record read/processed. Let's take an example. Imagine that we have a table A with 10M record and a table B with 100 records. The total number of record retrieved is: - if A is the left table, we process: 10M (all the records from A) + 100 * 10M (all the records from B for every record from A) = 101 * 10M - if B is the left table, we process: 100 (all the records from B) + 100 * 10M (all the records from A for each record from B) = ~ 100 * 10M So in the second case we process size of A - size B less records (same applies to number of bytes read). And there is another good point for the second option: ie. Spark is much better at computing/reading 10 times 10M records that 10M times 2 records as it can exploits its parallelism. That said, your comment still applies, ie. there may be cases in which one side is very onerous despite is the one with less records involved. Do you have any suggestion about how to estimate this? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208380139 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag]( valuePrepared } - override def next(): T = { + override def next(): Any = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false reader.get() } } -new InterruptibleIterator(context, iter) +// TODO: get rid of this type hack. +new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) --- End diff -- Why is this necessary? I think the TODO should be handled in this commit and that Spark shouldn't cast RDD[ColumnarBatch] to RDD[InternalRow]. What about having the RDD iterate over the rows in the batch to actually implement the interface? It can provide the underlying batches through a different API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208380370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala --- @@ -51,18 +58,19 @@ class DataSourceRDD[T: ClassTag]( valuePrepared } - override def next(): T = { + override def next(): Any = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false reader.get() } } -new InterruptibleIterator(context, iter) +// TODO: get rid of this type hack. +new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) } override def getPreferredLocations(split: Partition): Seq[String] = { - split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations() + split.asInstanceOf[DataSourceRDDPartition].inputPartition.preferredLocations() --- End diff -- Why doesn't this use `match` to check locality and default to no locality? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208383098 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -39,52 +36,43 @@ case class DataSourceV2ScanExec( @transient source: DataSourceV2, @transient options: Map[String, String], @transient pushedFilters: Seq[Expression], -@transient reader: DataSourceReader) +@transient readSupport: ReadSupport, +@transient scanConfig: ScanConfig) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { override def simpleString: String = "ScanV2 " + metadataString // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { case other: DataSourceV2ScanExec => - output == other.output && reader.getClass == other.reader.getClass && options == other.options + output == other.output && readSupport.getClass == other.readSupport.getClass && +options == other.options case _ => false } override def hashCode(): Int = { Seq(output, source, options).hashCode() } - override def outputPartitioning: physical.Partitioning = reader match { -case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 => - SinglePartition - -case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 => - SinglePartition - -case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 => + override def outputPartitioning: physical.Partitioning = readSupport match { +case _ if partitions.length == 1 => SinglePartition case s: SupportsReportPartitioning => new DataSourcePartitioning( -s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name))) +s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name))) case _ => super.outputPartitioning } - private lazy val partitions: Seq[InputPartition[InternalRow]] = { -reader.planInputPartitions().asScala - } + private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig) - private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match { -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - assert(!reader.isInstanceOf[ContinuousReader], -"continuous stream reader does not support columnar read yet.") - r.planBatchInputPartitions().asScala - } + private lazy val partitionReaderFactory = readSupport.createReaderFactory(scanConfig) - private lazy val inputRDD: RDD[InternalRow] = reader match { -case _: ContinuousReader => + private lazy val inputRDD: RDD[InternalRow] = readSupport match { +case _: ContinuousReadSupport => + assert(!partitionReaderFactory.supportColumnarReads(), --- End diff -- Can't Spark choose to use InternalRow reads instead? Why can't the source support both? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208383579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala --- @@ -93,21 +81,17 @@ case class DataSourceV2ScanExec( sparkContext, sqlContext.conf.continuousStreamingExecutorQueueSize, sqlContext.conf.continuousStreamingExecutorPollIntervalMs, -partitions).asInstanceOf[RDD[InternalRow]] - -case r: SupportsScanColumnarBatch if r.enableBatchRead() => - new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]] +partitions, +schema, + partitionReaderFactory.asInstanceOf[ContinuousPartitionReaderFactory]) --- End diff -- This should not cast. Just call `readSupport.createContinuousReaderFactory(...)` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208384141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -80,17 +80,17 @@ object DataSourceV2Strategy extends Strategy { */ // TODO: nested column pruning. private def pruneColumns( - reader: DataSourceReader, + configBuilder: ScanConfigBuilder, relation: DataSourceV2Relation, exprs: Seq[Expression]): Seq[AttributeReference] = { -reader match { +configBuilder match { case r: SupportsPushDownRequiredColumns => val requiredColumns = AttributeSet(exprs.flatMap(_.references)) val neededOutput = relation.output.filter(requiredColumns.contains) if (neededOutput != relation.output) { r.pruneColumns(neededOutput.toStructType) val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap - r.readSchema().toAttributes.map { + r.prunedSchema().toAttributes.map { --- End diff -- As I noted earlier, this shouldn't get the scan's schema until the scan is fully configured. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22008#discussion_r208385422 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala --- @@ -152,3 +153,45 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } } + +/** + * Swaps right and left logical plans of a join when left is bigger than right. This is useful + * because underlying cartesian product performs a nested loop, thus if the outer table is + * smaller there are less iterator initialization. --- End diff -- I have no idea that we have a good way so far to estimate the effort of materializing elements in one RDD, especially before we materialize it. That is why I think this optimization of swapping cross join doesn't always introduce improvement but sometimes regression. Let us see if others have more ideas. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21977 @gatorsmile, I started [YarnPySparkSuite](https://gist.github.com/rdblue/9848a00f49eaad6126fbbcfa1b039e19) but the YARN tests don't create python worker processes so the tests don't work. I need to find out how to force YARN to create workers in order to write tests. If you have any input that would help, please let me know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21977 **[Test build #94386 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94386/testReport)** for PR 21977 at commit [`ee750ef`](https://github.com/apache/spark/commit/ee750efae806ea958a6a5a327799dafe6a0b3e64). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21977 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user crafty-coder commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208387111 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,93 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform elements in an array using the transform function. This is similar to + * a `map` in functional programming. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), x -> x + 1); --- End diff -- The examples are not accurate. You could something like: ``` > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); array(('a', 1), ('b', 3), ('c', 5)) > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); array(4, 6) > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); array('ad', 'be', 'cf') ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21977 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94386/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21733: [SPARK-24763][SS] Remove redundant key data from value i...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21733 @tdas Done running perf. test with 4 more tests: > BenchmarkMovingAggregationsListenerKeyMuchBigger rate: 16 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 159877.232 | 149537.817 | 65000 | 133511303 | | patch (on top of c9914cf) | 160049.118 | 152497.945 | 65000 | 73236351 | state size: 54.854 % (reduces 45.15%) > BenchmarkMovingAggregationsListenerManyKeys rate: 12 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 120266.810 | 107482.042 | 65000 | 38433719 | | patch (on top of c9914cf) | 119865.855 | 109268.772 | 65000 | 24900343 | state size: 64.787% (reduces 35.21%) > BenchmarkMovingAggregationsListenerManyValues rate: 25000 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 25009.236 | 21216.126 | 9 | 77161711 (857.352 per row) | | patch (on top of c9914cf) | 25060.635 | 20774.500 | 99495 | 78230335 (786.274 per row) | state size: 91.709 % (reduces 8.29 %) > BenchmarkMovingAggregationsListenerValueMuchBigger rate: 85000 version | input rows per second | processed rows per second | total state rows | used bytes of current state version | | | | | latest master (c9914cf) | 85310.774 | 79091.271 | 1000 | 1324255 | | patch (on top of c9914cf) | 84791.761 | 79755.905 | 1000 | 1282687 | state size: 96.861 % (reduces 3.14 %) I don't find any outstanding perf. hit, and expected state size reduction is shown from all over the cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208389947 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} + * instance. + */ + default boolean needsReconfiguration() { --- End diff -- I think of a ReadSupport as something that can be read or scanned and ContinuousReadSupport as a stream that can be read. In that abstraction, the "something that can be read" probably isn't the right place to track whether a particular scan requires reconfiguration: a *scan* requires reconfiguration if that scan is based on partitions that are out of date. To me, that indicates that a Kafka `ScanConfig` should keep track of kafka partitions and then `needsReconfiguration` should return true if the Kafka topic now has a different set of partitions than the ones in the `ScanConfig`. Does that make sense? I think it would also be more consistent in the API to add `ScanConfig` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208390264 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- If Spark uses JSON to serialize, why can't Spark handle deserialization itself? Why not require `Offset` to have a human-readable `toString` and a `toBytes` for serialization? We don't have to conflate serialization with human readability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208390359 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the --- End diff -- I was thinking oldest available, but it's a minor point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22028 **[Test build #94387 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94387/testReport)** for PR 22028 at commit [`de44d0d`](https://github.com/apache/spark/commit/de44d0df8053c5caececee6b0f625d69b884b8d9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208391449 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.execution.streaming.BaseStreamingSource; +import org.apache.spark.sql.sources.v2.reader.InputPartition; +import org.apache.spark.sql.sources.v2.reader.ScanConfig; +import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder; + +/** + * An interface which defines how to scan the data from data source for streaming processing with + * continuous mode. + */ +@InterfaceStability.Evolving +public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource { + + /** + * Returns a builder of {@link ScanConfig}. The builder can take some query specific information + * like which operators to pushdown, streaming offsets, etc., and keep these information in the + * created {@link ScanConfig}. + * + * This is the first step of the data scan. All other methods in {@link ContinuousReadSupport} + * needs to take {@link ScanConfig} as an input. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + ScanConfigBuilder newScanConfigBuilder(Offset start); + + /** + * Returns a factory to produce {@link ContinuousPartitionReader}s for {@link InputPartition}s. + * + * If this method fails (by throwing an exception), the action will fail and no Spark job will be + * submitted. + */ + @Override + ContinuousPartitionReaderFactory createReaderFactory(ScanConfig config); + + /** + * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances + * for each partition to a single global offset. + */ + Offset mergeOffsets(PartitionOffset[] offsets); + + /** + * The execution engine will call this method in every epoch to determine if new input + * partitions need to be generated, which may be required if for example the underlying + * source system has had partitions added or removed. + * + * If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport} + * instance. + */ + default boolean needsReconfiguration() { --- End diff -- Makes sense to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22028 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22028 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94387/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22009: [SPARK-24882][SQL] improve data source v2 API
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/22009#discussion_r208392865 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader.streaming; + +import org.apache.spark.sql.sources.v2.reader.ReadSupport; + +/** + * A base interface for streaming read support. This is package private and is invisible to data + * sources. Data sources should implement concrete streaming read support interfaces: + * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}. + */ +interface StreamingReadSupport extends ReadSupport { + + /** + * Returns the initial offset for a streaming query to start reading from. Note that the + * streaming data source should not assume that it will start reading from its + * {@link #initialOffset()} value: if Spark is restarting an existing query, it will restart from + * the check-pointed offset rather than the initial one. + */ + Offset initialOffset(); + + /** + * Deserialize a JSON string into an Offset of the implementation-defined offset type. + * + * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader + */ + Offset deserializeOffset(String json); --- End diff -- Currently, there are two representations of any given offset: a connector-defined JVM object and a serialized JSON string. Spark can't build the JVM object itself because it doesn't know what the right type is. If you know of some clean way for a connector to declare "here is the type of my offsets", we should do that instead, but I only know how to do it through reflection magic more confusing than the status quo. I'd hesitate to introduce a third representation unless there's some concrete use case where JSON serialization won't work well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21977 @rdblue Is this for YARN only? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22031 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21977 cc @jiangxb1987 @cloud-fan @jerryshao @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22031 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1927/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22031: [TODO][SPARK-23932][SQL] Higher order function zip_with
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22031 **[Test build #94392 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94392/testReport)** for PR 22031 at commit [`c7e2dee`](https://github.com/apache/spark/commit/c7e2dee7cf48efd28d764fdd543bf366d65fcfa5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22017 **[Test build #94393 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94393/testReport)** for PR 22017 at commit [`89a3da4`](https://github.com/apache/spark/commit/89a3da4e292690b78fbb41deef4104be3f843c1b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22017 **[Test build #94394 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94394/testReport)** for PR 22017 at commit [`12ad8b2`](https://github.com/apache/spark/commit/12ad8b2248b7acb4a04289ca8da439ecb63206a9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22028: [SPARK-25046][SQL] Fix Alter View can excute sql like "A...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/22028 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22028: [SPARK-25046][SQL] Fix Alter View can excute sql ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22028 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208399620 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) + } + + @transient lazy val (arr1Var, arr2Var) = { +val LambdaFunction(_, + (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: Nil, _) = function +(arr1Var, arr2Var) + } + + override def eval(input: InternalRow): Any = { +val leftArr = left.eval(input).asInstanceOf[ArrayData] +val rightArr = right.eval(input).asInstanceOf[ArrayData] + +if (leftArr == null || rightArr == null) { --- End diff -- If ```leftArr``` is ```null```, ```right``` doesn't have to be evaluated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208403145 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) --- End diff -- If you want to support different size of input arrays (The jira ticket says: _"Both arrays must be the same length."_), what about the scenario when one array is empty and the second has elements? Shouldn't we use ```true``` instead of ```leftContainsNull``` and ```rightContainsNull```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208398313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { --- End diff -- You can utilize ```HigherOrderFunction.arrayArgumentType```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22004: [SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSerializa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22004 **[Test build #4235 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4235/testReport)** for PR 22004 at commit [`422c4ab`](https://github.com/apache/spark/commit/422c4ab259b5e27ef12c2d5093a4ae93f2b7f522). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22027 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22027 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22027 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1928/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22027: [SPARK-25010][SQL][FOLLOWUP] Shuffle should also produce...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22027 **[Test build #94395 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94395/testReport)** for PR 22027 at commit [`ddbcc04`](https://github.com/apache/spark/commit/ddbcc04bd6850b388f25faceb2cc4e1943a0f660). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22004: [SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSerializa...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/22004 Merged to master, but the janino issue is still outstanding --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22014: [SPARK-25036][SQL] avoid match may not be exhaust...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22014#discussion_r208406147 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -87,7 +87,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro // For top level row writer, it always writes to the beginning of the global buffer holder, // which means its fixed-size region always in the same position, so we don't need to call // `reset` to set up its fixed-size region every time. - if (inputs.map(_.isNull).forall(_ == "false")) { --- End diff -- @kiszk was this intentional? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22004: [SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSe...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22004 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21977 Yes, this is for YARN only. I've also opened follow-up issues for Mesos and Kubernetes integration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r208408858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala --- @@ -49,4 +51,11 @@ object DataSourceUtils { } } } + + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + private[sql] def isDataPath(path: Path): Boolean = { +val name = path.getName +!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) --- End diff -- Not sure what is your earlier impl. I would prefer to keeping unchanged the original code in `PartitioningAwareFileIndex.scala`. Just add a utility function `isDataPath ` in CommandUtils.scala. Does this sound good to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208410422 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -384,6 +392,10 @@ class RelationalGroupedDataset protected[sql]( .sort(pivotColumn) // ensure that the output columns are in a consistent logical order .collect() .map(_.get(0)) + .collect { +case row: GenericRow => struct(row.values.map(lit): _*) --- End diff -- I suspect this will not work for nested struct types, or say, multiple pivot columns with nested type. Could you please add a test like: ``` test("pivoting column list") { val expected = ... val df = trainingSales .groupBy($"sales.year") .pivot(struct($"sales", $"training")) .agg(sum($"sales.earnings")) checkAnswer(df, expected) } ``` And can we also check if it works for other complex nested types, like Array(Struct(...))? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22030: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/22030#discussion_r208411022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -384,6 +392,10 @@ class RelationalGroupedDataset protected[sql]( .sort(pivotColumn) // ensure that the output columns are in a consistent logical order .collect() .map(_.get(0)) + .collect { --- End diff -- Use "map"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21596: [SPARK-24601] Bump Jackson version
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21596 @jerryshao This is for 3.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org