[spark] branch master updated: [SPARK-28052][SQL] Make `ArrayExists` follow the three-valued boolean logic.
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ae1a6b [SPARK-28052][SQL] Make `ArrayExists` follow the three-valued boolean logic. 5ae1a6b is described below commit 5ae1a6bf0dea9e74ced85686ef33a87cfa3e90c2 Author: Takuya UESHIN AuthorDate: Sat Jun 15 10:48:06 2019 -0700 [SPARK-28052][SQL] Make `ArrayExists` follow the three-valued boolean logic. ## What changes were proposed in this pull request? Currently `ArrayExists` always returns boolean values (if the arguments are not `null`), but it should follow the three-valued boolean logic: - `true` if the predicate holds at least one `true` - otherwise, `null` if the predicate holds `null` - otherwise, `false` This behavior change is made to match Postgres' equivalent function `ANY/SOME (array)`'s behavior: https://www.postgresql.org/docs/9.6/functions-comparisons.html#AEN21174 ## How was this patch tested? Modified tests and existing tests. Closes #24873 from ueshin/issues/SPARK-28052/fix_exists. Authored-by: Takuya UESHIN Signed-off-by: Dongjoon Hyun --- docs/sql-migration-guide-upgrade.md| 2 + .../expressions/higherOrderFunctions.scala | 29 +- .../ReplaceNullWithFalseInPredicate.scala | 4 +- .../org/apache/spark/sql/internal/SQLConf.scala| 6 +++ .../expressions/HigherOrderFunctionsSuite.scala| 46 -- .../ReplaceNullWithFalseInPredicateSuite.scala | 14 ++- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 + ...laceNullWithFalseInPredicateEndToEndSuite.scala | 9 +++-- 8 files changed, 94 insertions(+), 18 deletions(-) diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index 44772cc..37be86f 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -139,6 +139,8 @@ license: | - Since Spark 3.0, we use a new protocol for fetching shuffle blocks, for external shuffle service users, we need to upgrade the server correspondingly. Otherwise, we'll get the error message `UnsupportedOperationException: Unexpected message: FetchShuffleBlocks`. If it is hard to upgrade the shuffle service right now, you can still use the old protocol by setting `spark.shuffle.useOldFetchProtocol` to `true`. + - Since Spark 3.0, a higher-order function `exists` follows the three-valued boolean logic, i.e., if the `predicate` returns any `null`s and no `true` is obtained, then `exists` will return `null` instead of `false`. For example, `exists(array(1, null, 3), x -> x % 2 == 0)` will be `null`. The previous behaviour can be restored by setting `spark.sql.legacy.arrayExistsFollowsThreeValuedLogic` to `false`. + ## Upgrading from Spark SQL 2.4 to 2.4.1 - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index e6cc11d..b326e1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedException} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.array.ByteArrayMethods @@ -388,6 +389,10 @@ case class ArrayFilter( Examples: > SELECT _FUNC_(array(1, 2, 3), x -> x % 2 == 0); true + > SELECT _FUNC_(array(1, 2, 3), x -> x % 2 == 10); + false + > SELECT _FUNC_(array(1, null, 3), x -> x % 2 == 0); + NULL """, since = "2.4.0") case class ArrayExists( @@ -395,6 +400,16 @@ case class ArrayExists( function: Expression) extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback { + private val followThreeValuedLogic = +SQLConf.get.getConf(SQLConf.LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC) + + override def nullable: Boolean = +if (followThreeValuedLogic) { + super.nullable || function.nullable +} else { + super.nullable +} + override def dataType: DataType = BooleanType override def functionType: AbstractDataType = BooleanType @@ -410,15 +425,25 @@ case class ArrayExists( val arr = argumentValue.asInstanceOf[ArrayData] val f =
[spark] branch master updated: [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series
This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6d441dc [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series 6d441dc is described below commit 6d441dcdc68dae886e375794a55658f70cd18d9d Author: WeichenXu AuthorDate: Sat Jun 15 08:29:20 2019 -0700 [SPARK-26412][PYSPARK][SQL] Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series ## What changes were proposed in this pull request? Allow Pandas UDF to take an iterator of pd.Series or an iterator of tuple of pd.Series. Note the UDF input args will be always one iterator: * if the udf take only column as input, the iterator's element will be pd.Series (corresponding to the column values batch) * if the udf take multiple columns as inputs, the iterator's element will be a tuple composed of multiple `pd.Series`s, each one corresponding to the multiple columns as inputs (keep the same order). For example: ``` pandas_udf("int", PandasUDFType.SCALAR_ITER) def the_udf(iterator): for col1_batch, col2_batch in iterator: yield col1_batch + col2_batch df.select(the_udf("col1", "col2")) ``` The udf above will add col1 and col2. I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review. We can test several typical cases: ``` from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.functions import udf from pyspark.taskcontext import TaskContext df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"]) pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi1(it): pid = TaskContext.get().partitionId() print("DBG: fi1: do init stuff, partitionId=" + str(pid)) for batch in it: yield batch + 100 print("DBG: fi1: do close stuff, partitionId=" + str(pid)) pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi2(it): pid = TaskContext.get().partitionId() print("DBG: fi2: do init stuff, partitionId=" + str(pid)) for batch in it: yield batch + 1 print("DBG: fi2: do close stuff, partitionId=" + str(pid)) pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi3(it): pid = TaskContext.get().partitionId() print("DBG: fi3: do init stuff, partitionId=" + str(pid)) for x, y in it: yield x + y * 10 + 10 print("DBG: fi3: do close stuff, partitionId=" + str(pid)) pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): return x + 1000 udf("int") def fu1(x): return x + 10 # test select "pandas iter udf/pandas udf/sql udf" expressions at the same time. # Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan, # and `fu1("a")`, `fp1("a")` will generate another two separate plans. df.select(fi1("a"), fi2("b"), fi3("a", "b"), fu1("a"), fp1("a")).show() # test chain two pandas iter udf together # Note this case `fi2(fi1("a"))` will generate only one plan # Also note the init stuff/close stuff call order will be like: # (debug output following) # DBG: fi2: do init stuff, partitionId=0 # DBG: fi1: do init stuff, partitionId=0 # DBG: fi1: do close stuff, partitionId=0 # DBG: fi2: do close stuff, partitionId=0 df.select(fi2(fi1("a"))).show() # test more complex chain # Note this case `fi1("a"), fi2("a")` will generate one plan, # and `fi3(fi1_output, fi2_output)` will generate another plan df.select(fi3(fi1("a"), fi2("a"))).show() ``` ## How was this patch tested? To be added. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24643 from WeichenXu123/pandas_udf_iter. Lead-authored-by: WeichenXu Co-authored-by: Xiangrui Meng Signed-off-by: Xiangrui Meng --- .../org/apache/spark/api/python/PythonRunner.scala | 2 + python/pyspark/rdd.py | 1 + python/pyspark/sql/functions.py| 3 + python/pyspark/sql/tests/test_pandas_udf_scalar.py | 882 ++--- python/pyspark/sql/udf.py | 13 +- python/pyspark/worker.py | 93 ++- .../spark/sql/catalyst/expressions/PythonUDF.scala | 3 +- .../plans/logical/pythonLogicalOperators.scala | 3 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../sql/execution/python/ArrowEvalPythonExec.scala | 5 +- .../sql/execution/python/ExtractPythonUDFs.scala | 44 +- 11 files
[spark] branch master updated (26998b8 -> a950570)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 26998b8 [SPARK-27870][SQL][PYTHON] Add a runtime buffer size configuration for Pandas UDFs add a950570 [MINOR][CORE] Remove unused variables, unused imports, etc. No new revisions were added by this update. Summary of changes: .../apache/spark/ExecutorAllocationManager.scala | 2 -- .../scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../scala/org/apache/spark/SecurityManager.scala | 1 - .../org/apache/spark/api/java/JavaPairRDD.scala| 2 +- .../spark/api/python/PythonWorkerFactory.scala | 2 +- .../apache/spark/api/r/RBackendAuthHandler.scala | 1 - .../main/scala/org/apache/spark/api/r/SerDe.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 5 ++- .../apache/spark/deploy/worker/DriverWrapper.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala| 5 --- .../apache/spark/memory/ExecutionMemoryPool.scala | 2 +- .../apache/spark/metrics/ExecutorMetricType.scala | 1 - .../apache/spark/metrics/sink/GraphiteSink.scala | 1 - .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 3 +- .../org/apache/spark/rpc/RpcEndpointRef.scala | 2 +- .../spark/scheduler/EventLoggingListener.scala | 1 - .../org/apache/spark/scheduler/SparkListener.scala | 3 +- .../cluster/CoarseGrainedSchedulerBackend.scala| 1 - .../org/apache/spark/status/AppStatusStore.scala | 2 +- .../status/api/v1/ApplicationListResource.scala| 2 +- .../spark/status/api/v1/JacksonMessageWriter.scala | 1 - .../status/api/v1/OneApplicationResource.scala | 1 - .../spark/status/api/v1/StagesResource.scala | 6 +--- .../storage/ShuffleBlockFetcherIterator.scala | 3 +- .../main/scala/org/apache/spark/ui/SparkUI.scala | 7 ++-- .../scala/org/apache/spark/ui/jobs/JobPage.scala | 1 - .../scala/org/apache/spark/ui/jobs/JobsTab.scala | 2 -- .../scala/org/apache/spark/ui/jobs/PoolPage.scala | 1 - .../scala/org/apache/spark/ui/jobs/StagePage.scala | 37 -- .../org/apache/spark/ui/jobs/StageTable.scala | 2 -- .../scala/org/apache/spark/ui/jobs/StagesTab.scala | 2 +- .../org/apache/spark/util/CommandLineUtils.scala | 2 -- .../util/collection/PartitionedPairBuffer.scala| 1 - .../util/io/ChunkedByteBufferFileRegion.scala | 4 --- 35 files changed, 19 insertions(+), 95 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (23ebd38 -> 26998b8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 23ebd38 [SPARK-27418][SQL] Migrate Parquet to File Data Source V2 add 26998b8 [SPARK-27870][SQL][PYTHON] Add a runtime buffer size configuration for Pandas UDFs No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 3 ++- .../main/scala/org/apache/spark/internal/config/package.scala | 1 + python/pyspark/daemon.py | 5 +++-- python/pyspark/java_gateway.py| 2 +- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala| 11 +++ .../apache/spark/sql/execution/python/ArrowPythonRunner.scala | 7 +++ 6 files changed, 25 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c79f471 -> 23ebd38)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c79f471 [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL add 23ebd38 [SPARK-27418][SQL] Migrate Parquet to File Data Source V2 No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/internal/SQLConf.scala| 2 +- ...org.apache.spark.sql.sources.DataSourceRegister | 2 +- .../spark/sql/execution/command/tables.scala | 5 +- .../sql/execution/datasources/SchemaPruning.scala | 4 +- .../datasources/parquet/ParquetFileFormat.scala| 100 + .../datasources/parquet/ParquetFilters.scala | 2 +- .../datasources/parquet/ParquetOutputWriter.scala | 2 +- .../datasources/parquet/ParquetReadSupport.scala | 4 +- .../datasources/parquet/ParquetUtils.scala | 130 +++ .../datasources/parquet/ParquetWriteSupport.scala | 4 +- .../datasources/v2/FilePartitionReader.scala | 17 + .../ParquetDataSourceV2.scala} | 14 +- .../v2/parquet/ParquetPartitionReaderFactory.scala | 227 +++ .../OrcScan.scala => parquet/ParquetScan.scala}| 58 ++- .../ParquetScanBuilder.scala} | 48 ++- .../OrcTable.scala => parquet/ParquetTable.scala} | 18 +- .../v2/parquet/ParquetWriteBuilder.scala | 116 ++ .../spark/sql/FileBasedDataSourceSuite.scala | 4 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 111 +++--- .../apache/spark/sql/execution/PlannerSuite.scala | 20 +- .../spark/sql/execution/SameResultSuite.scala | 32 +- .../spark/sql/execution/SparkPlanSuite.scala | 57 ++- .../datasources/parquet/ParquetFilterSuite.scala | 179 ++--- .../parquet/ParquetPartitionDiscoverySuite.scala | 419 ++--- .../datasources/parquet/ParquetQuerySuite.scala| 99 +++-- .../parquet/ParquetSchemaPruningSuite.scala| 40 +- .../datasources/parquet/ParquetSchemaSuite.scala | 2 +- .../sql/execution/metric/SQLMetricsSuite.scala | 47 +-- .../execution/python/ExtractPythonUDFsSuite.scala | 90 +++-- .../sources/v2/FileDataSourceV2FallBackSuite.scala | 8 +- .../spark/sql/streaming/FileStreamSinkSuite.scala | 132 +++ .../apache/spark/sql/streaming/StreamSuite.scala | 9 +- .../streaming/StreamingDeduplicationSuite.scala| 50 +-- .../spark/sql/streaming/StreamingQuerySuite.scala | 86 +++-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 10 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 5 +- .../spark/sql/sources/HadoopFsRelationTest.scala | 3 +- 37 files changed, 1513 insertions(+), 643 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcDataSourceV2.scala => parquet/ParquetDataSourceV2.scala} (76%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcScan.scala => parquet/ParquetScan.scala} (50%) copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcScanBuilder.scala => parquet/ParquetScanBuilder.scala} (53%) copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{orc/OrcTable.scala => parquet/ParquetTable.scala} (81%) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c79f471 [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL c79f471 is described below commit c79f471d0475fd98ddeb1e6281551e42684837d2 Author: maryannxue AuthorDate: Sat Jun 15 11:27:15 2019 +0200 [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL ## What changes were proposed in this pull request? Implemented a new SparkPlan that executes the query adaptively. It splits the query plan into independent stages and executes them in order according to their dependencies. The query stage materializes its output at the end. When one stage completes, the data statistics of the materialized output will be used to optimize the remainder of the query. The adaptive mode is off by default, when turned on, user can see "AdaptiveSparkPlan" as the top node of a query or sub-query. The inner plan of "AdaptiveSparkPlan" is subject to change during query execution but becomes final once the execution is complete. Whether the inner plan is final is included in the EXPLAIN string. Below is an example of the EXPLAIN plan before and after execution: Query: ``` SELECT * FROM testData JOIN testData2 ON key = a WHERE value = '1' ``` Before execution: ``` == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- SortMergeJoin [key#13], [a#23], Inner :- Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5) : +- Filter (isnotnull(value#14) AND (value#14 = 1)) :+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14] : +- Scan[obj#12] +- Sort [a#23 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#23, 5) +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] ``` After execution: ``` == Physical Plan == AdaptiveSparkPlan(isFinalPlan=true) +- *(1) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft :- BroadcastQueryStage 2 : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- ShuffleQueryStage 0 :+- Exchange hashpartitioning(key#13, 5) : +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1)) : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14] : +- Scan[obj#12] +- ShuffleQueryStage 1 +- Exchange hashpartitioning(a#23, 5) +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] ``` Credit also goes to carsonwang and cloud-fan ## How was this patch tested? Added new UT. Closes #24706 from maryannxue/aqe. Authored-by: maryannxue Signed-off-by: herman --- .../org/apache/spark/sql/internal/SQLConf.scala| 11 +- .../spark/sql/execution/QueryExecution.scala | 12 +- .../org/apache/spark/sql/execution/SparkPlan.scala | 35 +- .../apache/spark/sql/execution/SparkPlanInfo.scala | 3 + .../apache/spark/sql/execution/SparkPlanner.scala | 2 + .../spark/sql/execution/SparkStrategies.scala | 5 +- .../sql/execution/WholeStageCodegenExec.scala | 96 +++-- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 425 + .../adaptive/InsertAdaptiveSparkPlan.scala | 131 +++ .../sql/execution/adaptive/LogicalQueryStage.scala | 56 +++ .../adaptive/LogicalQueryStageStrategy.scala | 63 +++ .../adaptive/PlanAdaptiveSubqueries.scala | 36 ++ .../sql/execution/adaptive/QueryStageExec.scala| 210 ++ .../execution/exchange/BroadcastExchangeExec.scala | 28 +- .../execution/exchange/ExchangeCoordinator.scala |