[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/23124#discussion_r236952729 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + +/** + * A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes + * duplicated map keys w.r.t. the last wins policy. + */ +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { + assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") + assert(keyType != NullType, "map key cannot be null type.") + + private lazy val keyToIndex = keyType match { +case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] --- End diff -- FYI: I had a test lying around from when I worked on map_concat. With this PR: - map_concat of two small maps (20 string keys per map, no dups) for 2M rows is about 17% slower. - map_concat of two big maps (500 string keys per map, no dups) for 1M rows is about 25% slower. The baseline code is the same branch as the PR, but without the 4 commits. Some cost makes sense, as we're checking for dups, but it's odd that the overhead grows disproportionately as the size of the maps grows. I remember that at one time, mutable.HashMap had some performance issues (rumor has it, anyway). So as a test, I modified ArrayBasedMapBuilder.scala to use java.util.Hashmap instead. After that: - map_concat of two small maps (20 string keys per map, no dups) for 2M rows is about 12% slower. - map_concat of two big maps (500 string keys per map, no dups) for 1M rows is about 15% slower. It's a little more proportionate. I don't know if switching HashMap implementations would have some negative consequences. Also, my test is a dumb benchmark that uses System.currentTimeMillis concatenating simple [String,Integer] maps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23000: [SPARK-26002][SQL] Fix day of year calculation fo...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/23000#discussion_r234819827 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala --- @@ -410,6 +410,30 @@ class DateTimeUtilsSuite extends SparkFunSuite { assert(getDayInYear(getInUTCDays(c.getTimeInMillis)) === 78) } + test("SPARK-26002: correct day of year calculations for Julian calendar years") { +TimeZone.setDefault(TimeZoneUTC) --- End diff -- Just curious. Do you need to put back the old default when the test is over? Or does that not matter here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Client mode...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22504 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22865: [DOC] Fix doc for spark.sql.parquet.recordLevelFi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/22865#discussion_r228771361 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -462,7 +462,7 @@ object SQLConf { val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled") .doc("If true, enables Parquet's native record-level filtering using the pushed down " + "filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' " + - "is enabled.") + "is enabled and spark.sql.parquet.enableVectorizedReader is disabled.") --- End diff -- I see, because of this check: https://github.com/apache/spark/blob/d5573c578a1eea9ee04886d9df37c7178e67bb30/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L338 So when the data contains a Map column, for example., the vectorized reader is not used, even though spark.sql.parquet.enableVectorizedReader=true. How about something like: "If true, enables Parquet's native record-level filtering using the pushed down filters. This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled *and the vectorized reader is not used. You can ensure the vectorized reader is not used by setting 'spark.sql.parquet.enableVectorizedReader' to false*" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22865: [DOC] Fix doc for spark.sql.parquet.recordLevelFi...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/22865 [DOC] Fix doc for spark.sql.parquet.recordLevelFilter.enabled ## What changes were proposed in this pull request? Updated the doc string value for spark.sql.parquet.recordLevelFilter.enabled to indicate that spark.sql.parquet.enableVectorizedReader must be disabled. The code in ParquetFileFormat uses spark.sql.parquet.recordLevelFilter.enabled only after falling back to parquet-mr (see else for this if statement): https://github.com/apache/spark/blob/d5573c578a1eea9ee04886d9df37c7178e67bb30/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412 https://github.com/apache/spark/blob/d5573c578a1eea9ee04886d9df37c7178e67bb30/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L427-L430 Tests also bear this out. ## How was this patch tested? This is just a doc string fix: I built Spark and ran a single test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark confdocfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22865.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22865 commit af8a85ae4a1e477801bf104af6d4909cd822ba01 Author: Bruce Robbins Date: 2018-10-27T21:47:50Z update doc string for spark.sql.parquet.recordLevelFilter.enabled --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn Client...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22504 The Py4JJavaError StackOverflow happens pretty reliably. I am guessing its related to the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn Client...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22504 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn Client...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22504 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r218608537 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -1051,11 +1052,27 @@ private[hive] object HiveClientImpl { // When table is external, `totalSize` is always zero, which will influence join strategy. // So when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, // return None. +// If a table has a deserialization factor, the table owner expects the in-memory +// representation of the table to be larger than the table's totalSize value. In that case, +// multiply totalSize by the deserialization factor and use that number instead. +// If the user has set spark.sql.statistics.ignoreRawDataSize to true (because of HIVE-20079, +// for example), don't use rawDataSize. // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always // zero after INSERT command. So they are used here only if they are larger than zero. -if (totalSize.isDefined && totalSize.get > 0L) { - Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) -} else if (rawDataSize.isDefined && rawDataSize.get > 0) { +val factor = try { +properties.get("deserFactor").getOrElse("1.0").toDouble --- End diff -- I need to eliminate this duplication: There's a similar lookup and calculation done in PruneFileSourcePartitionsSuite. Also, I should check if a Long value, used as an intermediate value, is acceptable to hold file sizes (possibly, since a Long can represent 8 exabytes) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21950 I broke this :). Don't ask for a redo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r217216975 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala --- @@ -91,4 +91,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te assert(size2 < tableStats.get.sizeInBytes) } } + + test("Test deserialization factor against partition") { +val factor = 10 +withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.format("parquet") +.partitionBy("p").saveAsTable("tbl") + sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS") + + val df1 = sql("SELECT * FROM tbl WHERE p = 1") + val sizes1 = df1.queryExecution.optimizedPlan.collect { +case relation: LogicalRelation => relation.catalogTable.get.stats.get.sizeInBytes + } + assert(sizes1 != 0) --- End diff -- Oops. Should be assert(sizes1(0) != 0). I will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22192 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartiti...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22382 Thanks! Closing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.re...
Github user bersprockets closed the pull request at: https://github.com/apache/spark/pull/22382 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22192 retest this please. It's that old "java.lang.reflect.InvocationTargetException: null" error we've seen many times. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21899 cc @jinxing64 @hvanhovell @MaxGekk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartiti...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22382 cc @cloud-fan @JoshRosen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.re...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/22382 [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartition() data correctness issue ## What changes were proposed in this pull request? Back port of #22354 and #17955 to 2.2 (#22354 depends on methods introduced by #17955). --- An alternative fix for #21698 When Spark rerun tasks for an RDD, there are 3 different behaviors: 1. determinate. Always return the same result with same order when rerun. 2. unordered. Returns same data set in random order when rerun. 3. indeterminate. Returns different result when rerun. Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised. However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed. If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change. If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set. This PR fixed the failure handling for `repartition`, to avoid correctness issues. For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages. **future improvement:** 1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341 2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342 3. We should provide public API to allow users to tag the random level of the RDD's computing function. ## How was this patch tested? a new test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-23243-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22382.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22382 commit 97ba5a71e1903e0462bfac3201f1961e0c14f384 Author: Wenchen Fan Date: 2018-09-07T02:52:45Z [SPARK-23243][CORE][2.2] Fix RDD.repartition() data correctness issue backport https://github.com/apache/spark/pull/22112 to 2.2 --- An alternative fix for https://github.com/apache/spark/pull/21698 When Spark rerun tasks for an RDD, there are 3 different behaviors: 1. determinate. Always return the same result with same order when rerun. 2. unordered. Returns same data set in random order when rerun. 3. indeterminate. Returns different result when rerun. Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised. However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed. If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change. If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set. This PR fixed the failure handling for `repartition`, to avoid correctness issues. For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages. **future improvement:** 1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341 2. Cu
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22192 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22209: [SPARK-24415][Core] Fixed the aggregated stage metrics b...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22209 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22209: [SPARK-24415][Core] Fixed the aggregated stage metrics b...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22209 Looks like test failed due to https://issues.apache.org/jira/browse/SPARK-23622 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22209: [SPARK-24415][Core] Fixed the aggregated stage metrics b...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22209 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22188 @gatorsmile Thanks much! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22188 @gatorsmile >Why 2.2 only? Only that I forgot that master is already on 2.4. We should do 2.3 as well, but I haven't tested it yet. Do I need to do anything on my end to get it into 2.2, and once I test, into 2.3? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22188 @cloud-fan @gatorsmile Should we merge this also onto 2.2? It was a clean cherry-pick for me (from master to branch-2.2), and I ran the top and bottom tests (6000 columns, 1 million rows, 67 32M files, and 60 columns, 100 million rows, 67 32M files) from the PR description and got the same results. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21899#discussion_r212756302 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -118,12 +119,20 @@ case class BroadcastExchangeExec( // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => -throw new SparkFatalException( +val sizeMessage = if (dataSize != -1) { + s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated size of the " + +s"relation ($dataSize bytes)" --- End diff -- @rezasafi The dataSize appears to be inflated by 2-3 times, at least relative to the size of the actual data in the table. That may be because these relations are backed by map-like objects that have keys and (likely) other internal structures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21950#discussion_r212719073 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -76,4 +78,16 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { op } } + + private def calcPartSize(catalogTable: Option[CatalogTable], sizeInBytes: Long): Long = { +val conf: SQLConf = SQLConf.get +val factor = conf.sizeDeserializationFactor +if (catalogTable.isDefined && factor != 1.0 && + // TODO: The serde check should be in a utility function, since it is also checked elsewhere + catalogTable.get.storage.serde.exists(s => s.contains("Parquet") || s.contains("Orc"))) { --- End diff -- @mgaido91 Good point. Also, I notice that even when the table's files are not compressed (say, a table backed by CSV files), the LongToUnsafeRowMap or BytesToBytesMap that backs the relation is roughly 3 times larger than the total file size. So even under the best of circumstances (i.e., the table's files are not compressed), Spark will get it wrong by several multiples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22079: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...
Github user bersprockets closed the pull request at: https://github.com/apache/spark/pull/22079 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 @gatorsmile Weird, I don't see it on branch-2.2. Is that a sync issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22188 OK, I reran the tests for the lower column count cases, and the runs with the patch consistently show a tiny (1-3%) improvement compared to the master branch. So even the lower column count cases benefit a little. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22188 Thanks @vanzin. In my benchmark tests, the tiny degradation (0.5%) in the lower column count cases is pretty consistent, which concerns me a little. I am going to re-run those tests in a different environment and see what happens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22188: [SPARK-25164][SQL] Avoid rebuilding column and pa...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/22188 [SPARK-25164][SQL] Avoid rebuilding column and path list for each column in parquet reader ## What changes were proposed in this pull request? VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file. This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted. This PR changes initializeInternal so that it builds each list only once. I ran benchmarks on my laptop with 1 worker thread, running this query: sql("select * from parquet_backed_table where id1 = 1").collect There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column id1 has at least one matching row). 6000 columns, 1 million rows, 67 32M files: master | branch | improvement ---|-|--- 10.87 min | 6.09 min | 44% 6000 columns, 1 million rows, 23 98m files: master | branch | improvement ---|-|--- 7.39 min | 5.80 min | 21% 600 columns 10 million rows, 67 32M files: master | branch | improvement ---|-|--- 1.95 min | 1.96 min | -0.5% 60 columns, 100 million rows, 67 32M files: master | branch | improvement ---|-|--- 0.55 min | 0.55 min | 0% ## How was this patch tested? - sql unit tests - pyspark-sql tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-25164 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22188.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22188 commit 697de21501acbda3dbcd8ccc13a35ad3723a652e Author: Bruce Robbins Date: 2018-08-22T02:00:28Z Initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21899#discussion_r211833522 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -118,12 +119,20 @@ case class BroadcastExchangeExec( // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => -throw new SparkFatalException( +val sizeMessage = if (dataSize != -1) { + s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated size of the " + +s"relation ($dataSize bytes)" --- End diff -- Hmmm.. good question. I will check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22154 Re: Your build failure ('statefulOperators.scala:95: value asJava is not a member of scala.collection.immutable.Map[String,Long]). I am also seeing this in my fork on my laptop (I just updated my fork about 10-15 minutes ago). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 @gatorsmile So I should include all the related PRs merged to master as a single PR here? Just verifying. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21899 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21899#discussion_r211047556 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -118,12 +119,20 @@ case class BroadcastExchangeExec( // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => -throw new SparkFatalException( +val sizeMessage = if (dataSize != -1) { + s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated size of the " + --- End diff -- @hvanhovell That's what was being obscured :). In testing this, I've seen various places. In the three cases I have seen first hand: java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value. at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:628) at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:570) at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:865) At that line is an allocation: val newPage = new Array[Long](newNumWords.toInt) 2nd case: java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase spark.driver.memory by at least the estimated size of the relation (96468992 bytes). at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286) 3rd case: java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting \ spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value. at org.apache.spark.unsafe.memory.MemoryBlock.allocateFromObject(MemoryBlock.java:118) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.getUTF8String(UnsafeRow.java:420) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:311) At that line is also an allocation: mb = new ByteArrayMemoryBlock(array, offset, length); --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21899 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21899 @MaxGekk In the updated message, I left out "hash" from the term "hash relation" only because it seems the relation could be also be an Array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21950 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 @jiangxb1987 gentle ping. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 Once this is merged, I will also back-port: - [[SPARK-24564][TEST] Add test suite for RecordBinaryComparator](https://github.com/apache/spark/commit/5b0596648854c0c733b7c607661b78af7df18b89) - #22101 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22101 Should there be a test, or do other sorting-related tests cover this indirectly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 @jiangxb1987 Here are some of the differences from the original PR - I also ported the follow up PR #20426 - I ported #20088 (for SPARK-22905) to get the tests to pass. I also ported its followup, #20113. - sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java line 112 UnsafeExternalSorter.create does not take a Supplier for the 5th argument, so I put get() on the argument to directly pass a RecordComparator object. - sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala: I access SQLConf differently (since SQLConf.get doesn't appear to work in 2.2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartit...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/22079#discussion_r209736691 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala --- @@ -144,7 +144,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { val dataArray = Array.tabulate(model.selectedFeatures.length) { i => Data(model.selectedFeatures(i)) } - spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path)) + spark.createDataFrame(sc.makeRDD(dataArray, 1)).write.parquet(Loader.dataPath(path)) --- End diff -- @dongjoon-hyun Thanks, I will include the other commit and also update the title. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 Hmmm... I somehow managed to break SparkR tests but fixing a comment. It seems to have auto-retried and broke the second time too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 @jiangxb1987 > We shall also include #20088 in this backport PR. I did that shortly after commenting, which allowed the tests to pass. I squashed it into the first commit, so it wasn't obvious I did it. Should I also include #20426 in this PR, or treat that separately? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/22079 The test "model load / save" in ChiSqSelectorSuite fails because of this line in [ChiSqSelector.scala](https://github.com/apache/spark/blob/branch-2.2/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L147) spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path)) In 2.4, the line is: spark.createDataFrame(sc.makeRDD(dataArray, 1)).write.parquet(Loader.dataPath(path)) If you change 2.4 to also have that line, and also remove the follow-up PR (#20426) to avoid sorting when there is one partition, this test also fails on 2.4 in the same way. So I am not sure which way to go: Update ChiSqSelector.scala to be like 2.4 (simply a one line change), or make the test accept this new order. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartit...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/22079 [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could lead to incorrect answers ## What changes were proposed in this pull request? Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined. The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below: upstream stage -> repartition stage -> result stage (-> indicate a shuffle) When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data. The following code returns 931532, instead of 100: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() ``` In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too. The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression. This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly. Add unit test in ExchangeSuite. With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 100: ``` import scala.sys.process._ import org.apache.spark.TaskContext spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true") val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() res7: Long = 100 ``` Author: Xingbo Jiang ## How was this patch tested? Ran all SBT unit tests for org.apache.spark.sql.*. Ran pyspark tests for module pyspark-sql. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-23207 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22079.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22079 commit efccc028bce64bf4754ce81ee16533c19b4384b2 Author: Xingbo Jiang Date: 2018-01-26T23:01:03Z [SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to incorrect answers Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined. The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below: upstream stage -> repartition stage -> result stage (-> indicate a shuffle) When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data. The following code returns 931532, instead of 100: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x => x }.repartition(200).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f java".!!) } x } res.distinct().count() ``` In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the funct
[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21950 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21950 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21950: [SPARK-24912][SQL][WIP] Add configuration to avoi...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/21950 [SPARK-24912][SQL][WIP] Add configuration to avoid OOM during broadcast join (and other negative side effects of incorrect table sizing) ## What changes were proposed in this pull request? Added configuration settings to help avoid OOM errors during broadcast joins. - deser multiplication factor: Tell Spark to multiply totalSize times a specified factor for tables with encoded files (i.e., parquet or orc files). Spark will do this when calculating a table's sizeInBytes. This is modelled after Hive's hive.stats.deserialization.factor configuration setting. - ignore rawDataSize: Due to HIVE-20079, rawDataSize is broken. This settings tells Spark to ignore rawDataSize when calculating the table's sizeInBytes. One can partially simulate the deser multiplication factor without this change by decreasing the value in spark.sql.autoBroadcastJoinThreshold. However, that will affect all tables, not just the ones that are encoded. There is some awkwardness in that the check for file type (parquet or orc) uses Hive deser names, but the checks for partitioned tables need to be made outside of the Hive submodule. Still working that out. ## How was this patch tested? Added unit tests. Also, checked that I can avoid broadcast join OOM errors when using the deser multiplication factor on both my laptop and a cluster. Also checked that I can avoid OOM errors using the ignore rawDataSize flag on my laptop. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-24914 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21950.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21950 commit aa2a957751a906fe538822cace019014e763a8c3 Author: Bruce Robbins Date: 2018-07-26T00:36:17Z WIP version --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21899 >Is it possible to include the actual size of the in-memory table so far in the msg as well? Only if the relation can be built. If we run out of memory attempting to build the relation, we've essentially died in its constructor, so the relation doesn't exist. If we OOM after that (say when attempting to broadcast the relation), then we can get the size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21899 > Is it possible to include the actual size of the in-memory table so far in the msg as well? Possibly. The state of the relation might be messy when I go to query its size. >Also, does catching the OOM and throwing our own mess with HeapDumpOnOutOfMemoryError? @squito From my tests, it seems the heap dump is taken before the exception is caught. java.lang.OutOfMemoryError: Java heap space Dumping heap to java_pid70644.hprof ... Heap dump file created [842225516 bytes in 2.412 secs] java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:628) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/21899 [SPARK-24912][SQL] Don't obscure source of OOM during broadcast join ## What changes were proposed in this pull request? This PR shows the stack trace of the original OutOfMemoryError that occurs while building or broadcasting a HashedRelation, rather than the stack trace of the newly created OutOfMemoryError that's created during error handling. Currently, when an OOM occurs while broadcasting a table, the stack trace shows a line in the error handling: java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:122) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101) With this PR, it shows the original stack trace. java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:628) at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:570) at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:865) While sometimes the line on which is the exception is thrown is just a victim, sometimes it is a participant in the problem, as was the case in the above exception. ## How was this patch tested? Manually tested case where broadcast join caused an OOM, and a case where it did not. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-24912 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21899.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21899 commit ca19596c9c09e2cc0ef5667d84f1d289b8184b91 Author: Bruce Robbins Date: 2018-07-26T01:11:07Z Initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 @ueshin Thanks for all your help! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r199678852 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -551,6 +551,36 @@ object TypeCoercion { case None => s } + case m @ MapConcat(children) if children.forall(c => MapType.acceptsType(c.dataType)) && +!haveSameType(children) => +val keyTypes = children.map(_.dataType.asInstanceOf[MapType].keyType) --- End diff -- I don't necessarily have a _good_ reason, but here are two reasons I did that extra junk: 1) The Concat-like code didn't find a wider type amongst types map and map. So, it just fell to case None => m 2) If the call to map_concat has the same child types but multiple valueContainsNull values, the Concat-style code added a Cast to each child (this is because haveSameType considers expressions with different valueContainsNull values to have different types). It does no harm, as far as I can tell, but it seemed wrong. About issue 1): I will debug. I might have done something wrong there. Plus, even if it's a real bug in findWiderCommonType, it affects my longer code, which may be looking for a wider common type amongst the keys or values, which could themselves be maps. About issue 2): Maybe not an issue. Or I can create an alternate haveSameType() function for Maps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 Still working on type coercion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21628: [SPARK-23776][DOC] Update instructions for running PySpa...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21628 @HyukjinKwon Thanks for your help! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r197671215 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -475,6 +474,231 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp override def prettyName: String = "map_entries" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( + usage = "_FUNC_(map, ...) - Returns the union of all the given maps", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "b"], [2 -> "c"], [3 -> "d"]] + """, since = "2.4.0") +case class MapConcat(children: Seq[Expression]) extends Expression { + + override def checkInputDataTypes(): TypeCheckResult = { +var funcName = s"function $prettyName" +if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"input to $funcName should all be of type map, but it's " + + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) +} + } + + override def dataType: MapType = { +val dt = children.map(_.dataType.asInstanceOf[MapType]).headOption + .getOrElse(MapType(StringType, StringType)) +val valueContainsNull = children.map(_.dataType.asInstanceOf[MapType]) + .exists(_.valueContainsNull) +if (dt.valueContainsNull != valueContainsNull) { + dt.copy(valueContainsNull = valueContainsNull) +} else { + dt +} + } + + override def nullable: Boolean = children.exists(_.nullable) + + override def eval(input: InternalRow): Any = { +val maps = children.map(_.eval(input)) +if (maps.contains(null)) { + return null +} +val keyArrayDatas = maps.map(_.asInstanceOf[MapData].keyArray()) +val valueArrayDatas = maps.map(_.asInstanceOf[MapData].valueArray()) + +val numElements = keyArrayDatas.foldLeft(0L)((sum, ad) => sum + ad.numElements()) +if (numElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful attempt to concat maps with $numElements " + +s"elements due to exceeding the map size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} +val finalKeyArray = new Array[AnyRef](numElements.toInt) +val finalValueArray = new Array[AnyRef](numElements.toInt) +var position = 0 +for (i <- keyArrayDatas.indices) { + val keyArray = keyArrayDatas(i).toObjectArray(dataType.keyType) + val valueArray = valueArrayDatas(i).toObjectArray(dataType.valueType) + Array.copy(keyArray, 0, finalKeyArray, position, keyArray.length) + Array.copy(valueArray, 0, finalValueArray, position, valueArray.length) + position += keyArray.length +} + +new ArrayBasedMapData(new GenericArrayData(finalKeyArray), + new GenericArrayData(finalValueArray)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val mapCodes = children.map(_.genCode(ctx)) +val keyType = dataType.keyType +val valueType = dataType.valueType +val argsName = ctx.freshName("args") +val keyArgsName = ctx.freshName("keyArgs") +val valArgsName = ctx.freshName("valArgs") + +val mapDataClass = classOf[MapData].getName +val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName +val arrayDataClass = classOf[ArrayData].getName + +val init = + s""" +|$mapDataClass[] $argsName = new $mapDataClass[${mapCodes.size}]; +|$arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; +|$arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; +|boolean ${ev.isNull} = false; +|$mapDataClass ${ev.value} = null; + """.stripMargin + +val assignments = mapCodes.zipWithIndex.map { case (m, i) => + s""" + |${m.code} + |$argsName[$i] = ${m.value}; + |if (${m.isNull}) { + | ${ev.isNull} = true; + |} + """.stripMargin +} + +v
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 @ueshin >so I was wondering whether we need the same thing for MapConcat or not. Got it. I will research that, plus I will look at the entire pull request for Concat to see if there is anything else relevant to MapConcat. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r197669221 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -475,6 +474,231 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp override def prettyName: String = "map_entries" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( + usage = "_FUNC_(map, ...) - Returns the union of all the given maps", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "b"], [2 -> "c"], [3 -> "d"]] + """, since = "2.4.0") +case class MapConcat(children: Seq[Expression]) extends Expression { + + override def checkInputDataTypes(): TypeCheckResult = { +var funcName = s"function $prettyName" +if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"input to $funcName should all be of type map, but it's " + + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) +} + } + + override def dataType: MapType = { +val dt = children.map(_.dataType.asInstanceOf[MapType]).headOption + .getOrElse(MapType(StringType, StringType)) +val valueContainsNull = children.map(_.dataType.asInstanceOf[MapType]) + .exists(_.valueContainsNull) +if (dt.valueContainsNull != valueContainsNull) { + dt.copy(valueContainsNull = valueContainsNull) +} else { + dt +} + } + + override def nullable: Boolean = children.exists(_.nullable) + + override def eval(input: InternalRow): Any = { +val maps = children.map(_.eval(input)) +if (maps.contains(null)) { + return null +} +val keyArrayDatas = maps.map(_.asInstanceOf[MapData].keyArray()) +val valueArrayDatas = maps.map(_.asInstanceOf[MapData].valueArray()) + +val numElements = keyArrayDatas.foldLeft(0L)((sum, ad) => sum + ad.numElements()) +if (numElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful attempt to concat maps with $numElements " + +s"elements due to exceeding the map size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} +val finalKeyArray = new Array[AnyRef](numElements.toInt) +val finalValueArray = new Array[AnyRef](numElements.toInt) +var position = 0 +for (i <- keyArrayDatas.indices) { + val keyArray = keyArrayDatas(i).toObjectArray(dataType.keyType) + val valueArray = valueArrayDatas(i).toObjectArray(dataType.valueType) + Array.copy(keyArray, 0, finalKeyArray, position, keyArray.length) + Array.copy(valueArray, 0, finalValueArray, position, valueArray.length) + position += keyArray.length +} + +new ArrayBasedMapData(new GenericArrayData(finalKeyArray), + new GenericArrayData(finalValueArray)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val mapCodes = children.map(_.genCode(ctx)) +val keyType = dataType.keyType +val valueType = dataType.valueType +val argsName = ctx.freshName("args") +val keyArgsName = ctx.freshName("keyArgs") +val valArgsName = ctx.freshName("valArgs") + +val mapDataClass = classOf[MapData].getName +val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName +val arrayDataClass = classOf[ArrayData].getName + +val init = + s""" +|$mapDataClass[] $argsName = new $mapDataClass[${mapCodes.size}]; +|$arrayDataClass[] $keyArgsName = new $arrayDataClass[${mapCodes.size}]; +|$arrayDataClass[] $valArgsName = new $arrayDataClass[${mapCodes.size}]; +|boolean ${ev.isNull} = false; +|$mapDataClass ${ev.value} = null; + """.stripMargin + +val assignments = mapCodes.zipWithIndex.map { case (m, i) => + s""" + |${m.code} + |$argsName[$i] = ${m.value}; + |if (${m.isNull}) { + | ${ev.isNull} = true; + |} + """.stripMargin +} + +va
[GitHub] spark pull request #21628: [SPARK-23776][DOC] Update instructions for runnin...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21628#discussion_r197667457 --- Diff: docs/building-spark.md --- @@ -215,19 +215,23 @@ If you are building Spark for use in a Python environment and you wish to pip in Alternatively, you can also run make-distribution with the --pip option. -## PySpark Tests with Maven +## PySpark Tests with Maven or SBT If you are building PySpark and wish to run the PySpark tests you will need to build Spark with Hive support. ./build/mvn -DskipTests clean package -Phive ./python/run-tests +If you are building PySpark with SBT and wish to run the PySpark tests, you will need to build Spark with Hive support and also build the test components: + +./build/sbt -Phive clean package --- End diff -- I noticed that the pyspark tests were recently changed so that -Phive is no longer strictly necessary to run pyspark tests, but I decided not to address that in this update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21628: [SPARK-23776][DOC] Update instructions for runnin...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/21628 [SPARK-23776][DOC] Update instructions for running PySpark after building with SBT ## What changes were proposed in this pull request? This update tells the reader how to build Spark with SBT such that pyspark-sql tests will succeed. If you follow the current instructions for building Spark with SBT, pyspark/sql/udf.py fails with: AnalysisException: u'Can not load class test.org.apache.spark.sql.JavaStringLength, please make sure it is on the classpath;' ## How was this patch tested? I ran the doc build command (SKIP_API=1 jekyll build) and eyeballed the result. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark SPARK-23776_doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21628.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21628 commit 9fcd05d7cb52a68bea930625605013397b4989f6 Author: Bruce Robbins Date: 2018-06-25T02:07:12Z Update build doc for running pyspark after building with sbt --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21621: [SPARK-24633][SQL] Fix codegen when split is requ...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21621#discussion_r197646450 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -556,6 +556,17 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(df8.selectExpr("arrays_zip(v1, v2)"), expectedValue8) } + test("SPARK-24633: arrays_zip splits input processing correctly") { +Seq("true", "false").foreach { wholestageCodegenEnabled => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholestageCodegenEnabled) { +val df = spark.range(1) +val exprs = (0 to 5).map(x => array($"id" + lit(x))) --- End diff -- @mgaido91 Got it, you were testing that it does not split when wholestagecodegen is enabled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21621: [SPARK-24633][SQL] Fix codegen when split is requ...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21621#discussion_r197623576 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -556,6 +556,17 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(df8.selectExpr("arrays_zip(v1, v2)"), expectedValue8) } + test("SPARK-24633: arrays_zip splits input processing correctly") { +Seq("true", "false").foreach { wholestageCodegenEnabled => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholestageCodegenEnabled) { +val df = spark.range(1) +val exprs = (0 to 5).map(x => array($"id" + lit(x))) --- End diff -- This wasn't splitting input processing for me. Maybe splitExpressionsWithCurrentInputs has a bigger threshold than splitExpressions. Even at 90, it still had not split the input processing. At 100, it finally did. So someplace between 90 and 100, it starts splitting. I might be looking at the wrong thing. Check at your end. val exprs = (0 to 100).map(x => array($"id" + lit(x))) checkAnswer(df.select(arrays_zip(exprs: _*)), Row(Seq(Row(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 Hi @ueshin. Just a question while I work on the changes for your review comments. >I'm wondering whether we need type coercion like concat for array type is doing. Which type coercion in Concat are you referring to? Are you referring to the check for primitive vs object type in code generation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20909: [SPARK-23776][python][test] Check for needed components/...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/20909 @HyukjinKwon This PR is mostly obsolete. I will close it and re-open something smaller... maybe a one-line documentation change to handle the missing UDF case for those who build with sbt. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20909: [SPARK-23776][python][test] Check for needed comp...
Github user bersprockets closed the pull request at: https://github.com/apache/spark/pull/20909 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21231 @hvanhovell @maropu @viirya @kiszk Thanks for all the help! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r193280073 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -308,6 +308,170 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp override def prettyName: String = "map_entries" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", --- End diff -- @ueshin >We don't need to care about key duplication like CreateMap for now. Just verifying: This means I should simply concatenate the maps, possibly creating additional duplicates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21231 ping @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21308: SPARK-24253: Add DeleteSupport mix-in for DataSou...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21308#discussion_r190963247 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.catalyst.expressions.Expression; + +/** + * A mix-in interface for {@link DataSourceV2} delete support. Data sources can implement this + * interface to provide the ability to delete data from tables that matches filter expressions. + * + * Data sources must implement this interface to support logical operations that combine writing + * data with deleting data, like overwriting partitions. + */ +public interface DeleteSupport extends DataSourceV2 { + /** + * Delete data from a data source table that matches filter expressions. + * + * Rows are deleted from the data source iff all of the filter expressions match. That is, the + * expressions must be interpreted as a set of filters that are ANDed together. + * + * Implementations may reject a delete operation if the delete isn't possible without significant + * effort. For example, partitioned data sources may reject deletes that do not filter by + * partition columns because the filter may require rewriting files without deleted records. + * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear + * error message that identifies which expression was rejected. + * + * Implementations may throw {@link UnsupportedOperationException} if the delete operation is not + * supported because one of the filter expressions is not supported. Implementations should throw + * this exception with a clear error message that identifies the unsupported expression. + * + * @param filters filter expressions, used to select rows to delete when all expressions match + * @throws UnsupportedOperationException If one or more filter expressions is not supported + * @throws IllegalArgumentException If the delete is rejected due to required effort + */ + void deleteWhere(Expression[] filters); --- End diff -- >Do you think it would be more clear if this were explicitly a driver-side operation? Possibly. Maybe in the big data world this is already obvious. To me, it looks like a general purpose delete. Maybe deletePartitions? (I am bad at naming things, however). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r189161277 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -56,6 +58,28 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(MapValues(m2), null) } + test("Map Concat") { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21231 @maropu @hvanhovell @viirya Are all pending issues resolved? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21305#discussion_r188960491 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -344,6 +344,36 @@ case class Join( } } +/** + * Append data to an existing DataSourceV2 table. + */ +case class AppendData( --- End diff -- How does this logical plan node map to the 8 operations outlined in your SPIP? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21308: SPARK-24253: Add DeleteSupport mix-in for DataSou...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21308#discussion_r188392219 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.catalyst.expressions.Expression; + +/** + * A mix-in interface for {@link DataSourceV2} delete support. Data sources can implement this + * interface to provide the ability to delete data from tables that matches filter expressions. + * + * Data sources must implement this interface to support logical operations that combine writing + * data with deleting data, like overwriting partitions. + */ +public interface DeleteSupport extends DataSourceV2 { + /** + * Delete data from a data source table that matches filter expressions. + * + * Rows are deleted from the data source iff all of the filter expressions match. That is, the + * expressions must be interpreted as a set of filters that are ANDed together. + * + * Implementations may reject a delete operation if the delete isn't possible without significant + * effort. For example, partitioned data sources may reject deletes that do not filter by + * partition columns because the filter may require rewriting files without deleted records. + * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear + * error message that identifies which expression was rejected. + * + * Implementations may throw {@link UnsupportedOperationException} if the delete operation is not + * supported because one of the filter expressions is not supported. Implementations should throw + * this exception with a clear error message that identifies the unsupported expression. + * + * @param filters filter expressions, used to select rows to delete when all expressions match + * @throws UnsupportedOperationException If one or more filter expressions is not supported + * @throws IllegalArgumentException If the delete is rejected due to required effort + */ + void deleteWhere(Expression[] filters); --- End diff -- Does putting the delete method here (as opposed to say, in DataDeleters on some other thing parallel to to the DataWriters) imply that this is a driver-side operation only? I understand the use case is deleting partitions which is usually only a file system operation, but will that always be the case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21231: [SPARK-24119][SQL]Add interpreted execution to So...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21231#discussion_r187192485 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala --- @@ -147,7 +148,40 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { (!child.isAscending && child.nullOrdering == NullsLast) } - override def eval(input: InternalRow): Any = throw new UnsupportedOperationException + private lazy val calcPrefix: Any => Long = child.child.dataType match { +case BooleanType => (raw) => + if (raw.asInstanceOf[Boolean]) 1 else 0 +case DateType | TimestampType | _: IntegralType => (raw) => --- End diff -- I'm a little apprehensive about changing existing and properly functioning code (in doGenCode), even though it would make it slightly more readable. If you think I should, though, I will. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21144 @cloud-fan I don't think this is an issue in 2.3. It would be an issue only once [SPARK-23580](https://issues.apache.org/jira/browse/SPARK-23580) ("Interpreted mode fallback should be implemented") is completed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21144 Thanks much! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 @ueshin Hopefully I have addressed all of your review comments. Also, I have a question about what it means to dedup across maps when Spark allows duplicates in maps [here.](https://issues.apache.org/jira/browse/SPARK-23936?focusedCommentId=16464245&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16464245) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r186570491 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -116,6 +117,169 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """, since = "2.4.0") +case class MapConcat(children: Seq[Expression]) extends Expression { + + override def checkInputDataTypes(): TypeCheckResult = { +// check key types and value types separately to allow valueContainsNull to vary +if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType.asInstanceOf[MapType].keyType) + .exists(_.isInstanceOf[MapType])) { + // map_concat needs to pick a winner when multiple maps contain the same key. map_concat + // can do that only if it can detect when two keys are the same. SPARK-9415 states "map type + // should not support equality, hash". As a result, map_concat does not support a map type + // as a key + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName cannot have a map type as a key") +} else if (children.map(_.dataType.asInstanceOf[MapType].keyType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType.asInstanceOf[MapType].valueType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: MapType = { +MapType( + keyType = children.headOption + .map(_.dataType.asInstanceOf[MapType].keyType).getOrElse(StringType), + valueType = children.headOption + .map(_.dataType.asInstanceOf[MapType].valueType).getOrElse(StringType), + valueContainsNull = children.map { c => +c.dataType.asInstanceOf[MapType] + }.exists(_.valueContainsNull) +) + } + + override def nullable: Boolean = children.exists(_.nullable) + + override def eval(input: InternalRow): Any = { +val union = new util.LinkedHashMap[Any, Any]() +children.map(_.eval(input)).foreach { raw => + if (raw == null) { +return null + } + val map = raw.asInstanceOf[MapData] + map.foreach(dataType.keyType, dataType.valueType, (k, v) => +union.put(k, v) + ) +} +val (keyArray, valueArray) = union.entrySet().toArray().map { e => --- End diff -- I would imagine bad things would happen before you got this far (even Map's size method returns an Int). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21144 @hvanhovell @maropu Is there anything on this PR that I should do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21231: [SPARK-24119][SQL]Add interpreted execution to So...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21231#discussion_r186307490 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.sql.{Date, Timestamp} +import java.util.TimeZone + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._ + +class SortOrderExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { + + test("SortPrefix") { +// Explicitly choose a time zone, since Date objects can create different values depending on +// local time zone of the machine on which the test is running +TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles")) --- End diff -- @kiszk Maybe not a nit. I should fix that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21231 @maropu @kiszk Hopefully I've addressed all comments. Please take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21231 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21231 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21231: [SPARK-24119][SQL]Add interpreted execution to So...
GitHub user bersprockets opened a pull request: https://github.com/apache/spark/pull/21231 [SPARK-24119][SQL]Add interpreted execution to SortPrefix expression ## What changes were proposed in this pull request? Implemented eval in SortPrefix expression. ## How was this patch tested? - ran existing sbt SQL tests - added unit test - ran existing Python SQL tests - manual tests: disabling codegen -- patching code to disable beyond what spark.sql.codegen.wholeStage=false can do -- and running sbt SQL tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/bersprockets/spark sortprefixeval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21231.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21231 commit 280b941a1adc2ffcd82810a69f3d9e475607b70a Author: Bruce Robbins Date: 2018-04-28T23:26:48Z Checkpoint changes commit 4a810ae8776893ba1d12e620930473fdd61d1f49 Author: Bruce Robbins Date: 2018-04-29T18:25:26Z Checkpoint commit commit d1a7e220040ba1cfe4facb7d6eef9ae1251768aa Author: Bruce Robbins Date: 2018-04-29T19:19:37Z Checkpoint commit commit 4dbb0b7ae959a6a4f85122a887bab4e1563255f0 Author: Bruce Robbins Date: 2018-04-29T22:33:21Z Comment on testing oddity commit 227b6ac71bcfe6a54051f47dd16aec047b0a98d9 Author: Bruce Robbins Date: 2018-04-30T21:22:09Z Add boolean test for Sortprefix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21169: [SPARK-23715][SQL] the input of to/from_utc_timestamp ca...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21169 Addresses all of my comments, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21073#discussion_r185392954 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -116,6 +117,161 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns the union of all the given maps. + */ +@ExpressionDescription( +usage = "_FUNC_(map, ...) - Returns the union of all the given maps", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); + [[1 -> "a"], [2 -> "c"], [3 -> "d"] + """, since = "2.4.0") +case class MapConcat(children: Seq[Expression]) extends Expression { + + override def checkInputDataTypes(): TypeCheckResult = { +// check key types and value types separately to allow valueContainsNull to vary +if (children.exists(!_.dataType.isInstanceOf[MapType])) { + TypeCheckResult.TypeCheckFailure( +s"The given input of function $prettyName should all be of type map, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType.asInstanceOf[MapType].keyType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else if (children.map(_.dataType.asInstanceOf[MapType].valueType).distinct.length > 1) { + TypeCheckResult.TypeCheckFailure( +s"The given input maps of function $prettyName should all be the same type, " + + "but they are " + children.map(_.dataType.simpleString).mkString("[", ", ", "]")) +} else { + TypeCheckResult.TypeCheckSuccess +} + } + + override def dataType: MapType = { +MapType( + keyType = children.headOption + .map(_.dataType.asInstanceOf[MapType].keyType).getOrElse(StringType), + valueType = children.headOption + .map(_.dataType.asInstanceOf[MapType].valueType).getOrElse(StringType), + valueContainsNull = children.map { c => +c.dataType.asInstanceOf[MapType] + }.exists(_.valueContainsNull) +) + } + + override def nullable: Boolean = children.exists(_.nullable) + + override def eval(input: InternalRow): Any = { +val union = new util.LinkedHashMap[Any, Any]() +children.map(_.eval(input)).foreach { raw => + if (raw == null) { +return null + } + val map = raw.asInstanceOf[MapData] + map.foreach(dataType.keyType, dataType.valueType, (k, v) => +union.put(k, v) + ) --- End diff -- I found an issue. I was preparing to add some more tests when I noticed that using maps as keys doesn't work well in interpreted mode (seems to work fine in codegen mode, so far). So, something like this doesn't work in interpreted mode: scala> dfmapmap.show(truncate=false) +--+-+ |mapmap1 |mapmap2 | +--+-+ |[[1 -> 2, 3 -> 4] -> 101, [5 -> 6, 7 -> 8] -> 102]|[[11 -> 12] -> 103, [1 -> 2, 3 -> 4] -> 1001]| +--+-+ scala> dfmapmap.select(map_concat('mapmap1, 'mapmap2).as('mapmap3)).show(truncate=false) +---+ |mapmap3 | +---+ |[[1 -> 2, 3 -> 4] -> 101, [5 -> 6, 7 -> 8] -> 102, [11 -> 12] -> 103, [1 -> 2, 3 -> 4] -> 1001]| +---+ As you can see, the key `[1 -> 2, 3 -> 4]` shows up twice in the new map. This is because: val a1 = new ArrayBasedMapData
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 A test failed with "./bin/spark-submit ... No such file or directory" Seems like there's lots of spurious test failures right now. I will hold off on re-running for a little while. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21144 @hvanhovell @maropu As it turns out, there are at least two places where an InterpretedPredicate is created but never initialized: SimpleTextSource.buildReader, and ExternalCatalogUtils.prunePartitionsByFilter. It seems unlikely that nondeterministic expressions would be used for partitions, but maybe I am not imaginative enough. Also, these seem like bugs in those two classes, not in InterpretedPredicate. Also, I reran the mostly-interpreted SQL unit tests, this time with SortPrefix implemented, and got the error count down from 1270 to 114. None of the errors were 'requirement failed' exceptions (so no uninitialized nondeterministic expressions). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21141: [SPARK-23853][PYSPARK][TEST] Run Hive-related PySpark te...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21141 My experience here is limited. Still, it also looks good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21073 @mn-mikke @kiszk Thanks for the review. I addressed the comments. Please take a look when you have a chance. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org