[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236952729
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+  assert(keyType != NullType, "map key cannot be null type.")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
--- End diff --

FYI: I had a test lying around from when I worked on map_concat. With this 
PR:

- map_concat of two small maps (20 string keys per map, no dups) for 2M 
rows is about 17% slower.
- map_concat of two big maps (500 string keys per map, no dups) for 1M rows 
is about 25% slower.

The baseline code is the same branch as the PR, but without the 4 commits.

Some cost makes sense, as we're checking for dups, but it's odd that the 
overhead grows disproportionately as the size of the maps grows.


I remember that at one time, mutable.HashMap had some performance issues 
(rumor has it, anyway). So as a test, I modified ArrayBasedMapBuilder.scala to 
use java.util.Hashmap instead. After that:

- map_concat of two small maps (20 string keys per map, no dups) for 2M 
rows is about 12% slower.
- map_concat of two big maps (500 string keys per map, no dups) for 1M rows 
is about 15% slower.

It's a little more proportionate. I don't know if switching HashMap 
implementations would have some negative consequences.

Also, my test is a dumb benchmark that uses System.currentTimeMillis 
concatenating simple [String,Integer] maps.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23000: [SPARK-26002][SQL] Fix day of year calculation fo...

2018-11-19 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/23000#discussion_r234819827
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 ---
@@ -410,6 +410,30 @@ class DateTimeUtilsSuite extends SparkFunSuite {
 assert(getDayInYear(getInUTCDays(c.getTimeInMillis)) === 78)
   }
 
+  test("SPARK-26002: correct day of year calculations for Julian calendar 
years") {
+TimeZone.setDefault(TimeZoneUTC)
--- End diff --

Just curious. Do you need to put back the old default when the test is 
over? Or does that not matter here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Client mode...

2018-11-02 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22504
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22865: [DOC] Fix doc for spark.sql.parquet.recordLevelFi...

2018-10-28 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/22865#discussion_r228771361
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -462,7 +462,7 @@ object SQLConf {
   val PARQUET_RECORD_FILTER_ENABLED = 
buildConf("spark.sql.parquet.recordLevelFilter.enabled")
 .doc("If true, enables Parquet's native record-level filtering using 
the pushed down " +
   "filters. This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' " +
-  "is enabled.")
+  "is enabled and spark.sql.parquet.enableVectorizedReader is 
disabled.")
--- End diff --

I see, because of this check:

https://github.com/apache/spark/blob/d5573c578a1eea9ee04886d9df37c7178e67bb30/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L338
So when the data contains a Map column, for example., the vectorized reader 
is not used, even though spark.sql.parquet.enableVectorizedReader=true.

How about something like:

"If true, enables Parquet's native record-level filtering using the pushed 
down filters. This configuration only has an effect when 
'spark.sql.parquet.filterPushdown' is enabled *and the vectorized reader is not 
used. You can ensure the vectorized reader is not used by setting 
'spark.sql.parquet.enableVectorizedReader' to false*"






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22865: [DOC] Fix doc for spark.sql.parquet.recordLevelFi...

2018-10-27 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/22865

[DOC] Fix doc for spark.sql.parquet.recordLevelFilter.enabled

## What changes were proposed in this pull request?

Updated the doc string value for 
spark.sql.parquet.recordLevelFilter.enabled to indicate that 
spark.sql.parquet.enableVectorizedReader must be disabled.

The code in ParquetFileFormat uses 
spark.sql.parquet.recordLevelFilter.enabled only after falling back to 
parquet-mr (see else for this if statement): 
https://github.com/apache/spark/blob/d5573c578a1eea9ee04886d9df37c7178e67bb30/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L412

https://github.com/apache/spark/blob/d5573c578a1eea9ee04886d9df37c7178e67bb30/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L427-L430

Tests also bear this out.

## How was this patch tested?

This is just a doc string fix: I built Spark and ran a single test.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark confdocfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22865.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22865


commit af8a85ae4a1e477801bf104af6d4909cd822ba01
Author: Bruce Robbins 
Date:   2018-10-27T21:47:50Z

update doc string for spark.sql.parquet.recordLevelFilter.enabled




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn Client...

2018-09-28 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22504
  
The Py4JJavaError StackOverflow happens pretty reliably. I am guessing its 
related to the change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn Client...

2018-09-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22504
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22504: [SPARK-25118][Submit] Persist Driver Logs in Yarn Client...

2018-09-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22504
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...

2018-09-18 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21950#discussion_r218608537
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -1051,11 +1052,27 @@ private[hive] object HiveClientImpl {
 // When table is external, `totalSize` is always zero, which will 
influence join strategy.
 // So when `totalSize` is zero, use `rawDataSize` instead. When 
`rawDataSize` is also zero,
 // return None.
+// If a table has a deserialization factor, the table owner expects 
the in-memory
+// representation of the table to be larger than the table's totalSize 
value. In that case,
+// multiply totalSize by the deserialization factor and use that 
number instead.
+// If the user has set spark.sql.statistics.ignoreRawDataSize to true 
(because of HIVE-20079,
+// for example), don't use rawDataSize.
 // In Hive, when statistics gathering is disabled, `rawDataSize` and 
`numRows` is always
 // zero after INSERT command. So they are used here only if they are 
larger than zero.
-if (totalSize.isDefined && totalSize.get > 0L) {
-  Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = 
rowCount.filter(_ > 0)))
-} else if (rawDataSize.isDefined && rawDataSize.get > 0) {
+val factor = try {
+properties.get("deserFactor").getOrElse("1.0").toDouble
--- End diff --

I need to eliminate this duplication: There's a similar lookup and 
calculation done in PruneFileSourcePartitionsSuite. Also, I should check if a 
Long value, used as an intermediate value, is acceptable to hold file sizes 
(possibly, since a Long can represent 8 exabytes)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...

2018-09-17 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21950
  
I broke this :). Don't ask for a redo.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...

2018-09-12 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21950#discussion_r217216975
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
 ---
@@ -91,4 +91,28 @@ class PruneFileSourcePartitionsSuite extends QueryTest 
with SQLTestUtils with Te
   assert(size2 < tableStats.get.sizeInBytes)
 }
   }
+
+  test("Test deserialization factor against partition") {
+val factor = 10
+withTable("tbl") {
+  spark.range(10).selectExpr("id", "id % 3 as 
p").write.format("parquet")
+.partitionBy("p").saveAsTable("tbl")
+  sql(s"ANALYZE TABLE tbl COMPUTE STATISTICS")
+
+  val df1 = sql("SELECT * FROM tbl WHERE p = 1")
+  val sizes1 = df1.queryExecution.optimizedPlan.collect {
+case relation: LogicalRelation => 
relation.catalogTable.get.stats.get.sizeInBytes
+  }
+  assert(sizes1 != 0)
--- End diff --

Oops. Should be assert(sizes1(0) != 0). I will fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-11 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22192
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartiti...

2018-09-11 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22382
  
Thanks! Closing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.re...

2018-09-11 Thread bersprockets
Github user bersprockets closed the pull request at:

https://github.com/apache/spark/pull/22382


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-10 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22192
  
retest this please.

It's that old "java.lang.reflect.InvocationTargetException: null" error 
we've seen many times.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...

2018-09-10 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21899
  
cc @jinxing64 @hvanhovell  @MaxGekk 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartiti...

2018-09-10 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22382
  
cc @cloud-fan @JoshRosen 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22382: [SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.re...

2018-09-10 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/22382

[SPARK-23243] [SPARK-20715][CORE][2.2] Fix RDD.repartition() data 
correctness issue

## What changes were proposed in this pull request?

Back port of #22354 and #17955 to 2.2 (#22354 depends on methods introduced 
by #17955).

---

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When 
this happens, Spark needs to rerun all the tasks of all the succeeding stages 
if the RDD output is indeterminate, because the input of the succeeding stages 
has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of 
the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.

For `repartition`, it applies a stateful map function to generate a 
round-robin id, which is order sensitive and makes the RDD's output 
indeterminate. When the stage contains `repartition` reruns, we must also rerun 
all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just 
fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We 
should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of 
the RDD's computing function.

## How was this patch tested?

a new test case


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-23243-2.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22382


commit 97ba5a71e1903e0462bfac3201f1961e0c14f384
Author: Wenchen Fan 
Date:   2018-09-07T02:52:45Z

[SPARK-23243][CORE][2.2] Fix RDD.repartition() data correctness issue

backport https://github.com/apache/spark/pull/22112 to 2.2

---

An alternative fix for https://github.com/apache/spark/pull/21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, 
when a task is failed, just rerun it. Although the rerun task may return a 
different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When 
this happens, Spark needs to rerun all the tasks of all the succeeding stages 
if the RDD output is indeterminate, because the input of the succeeding stages 
has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of 
the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle 
partitioner is always deterministic(round-robin partitioner is not a shuffle 
partitioner that extends `org.apache.spark.Partitioner`), so the reducers will 
still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness 
issues.

For `repartition`, it applies a stateful map function to generate a 
round-robin id, which is order sensitive and makes the RDD's output 
indeterminate. When the stage contains `repartition` reruns, we must also rerun 
all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just 
fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Cu

[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-04 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22192
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22209: [SPARK-24415][Core] Fixed the aggregated stage metrics b...

2018-08-29 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22209
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22209: [SPARK-24415][Core] Fixed the aggregated stage metrics b...

2018-08-28 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22209
  
Looks like test failed due to 
https://issues.apache.org/jira/browse/SPARK-23622


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22209: [SPARK-24415][Core] Fixed the aggregated stage metrics b...

2018-08-28 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22209
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...

2018-08-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22188
  
@gatorsmile Thanks much!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...

2018-08-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22188
  
@gatorsmile 
>Why 2.2 only?

Only that I forgot that master is already on 2.4. We should do 2.3 as well, 
but I haven't tested it yet.

Do I need to do anything on my end to get it into 2.2, and once I test, 
into 2.3?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...

2018-08-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22188
  
@cloud-fan @gatorsmile Should we merge this also onto 2.2? It was a clean 
cherry-pick for me (from master to branch-2.2), and I ran the top and bottom 
tests (6000 columns, 1 million rows, 67 32M files, and 60 columns, 100 million 
rows, 67 32M files) from the PR description and got the same results.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...

2018-08-24 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21899#discussion_r212756302
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -118,12 +119,20 @@ case class BroadcastExchangeExec(
   // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
   // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
-throw new SparkFatalException(
+val sizeMessage = if (dataSize != -1) {
+  s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated 
size of the " +
+s"relation ($dataSize bytes)"
--- End diff --

@rezasafi The dataSize appears to be inflated by 2-3 times, at least 
relative to the size of the actual data in the table. That may be because these 
relations are backed by map-like objects that have keys and (likely) other 
internal structures.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21950: [SPARK-24914][SQL][WIP] Add configuration to avoi...

2018-08-24 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21950#discussion_r212719073
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -76,4 +78,16 @@ private[sql] object PruneFileSourcePartitions extends 
Rule[LogicalPlan] {
 op
   }
   }
+
+  private def calcPartSize(catalogTable: Option[CatalogTable], 
sizeInBytes: Long): Long = {
+val conf: SQLConf = SQLConf.get
+val factor = conf.sizeDeserializationFactor
+if (catalogTable.isDefined && factor != 1.0 &&
+  // TODO: The serde check should be in a utility function, since it 
is also checked elsewhere
+  catalogTable.get.storage.serde.exists(s => s.contains("Parquet") || 
s.contains("Orc"))) {
--- End diff --

@mgaido91 Good point. Also, I notice that even when the table's files are 
not compressed (say, a table backed by CSV files), the LongToUnsafeRowMap or 
BytesToBytesMap that backs the relation is roughly 3 times larger than the 
total file size. So even under the best of circumstances (i.e., the table's 
files are not compressed), Spark will get it wrong by several multiples.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22079: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-251...

2018-08-23 Thread bersprockets
Github user bersprockets closed the pull request at:

https://github.com/apache/spark/pull/22079


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

2018-08-23 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
@gatorsmile Weird, I don't see it on branch-2.2. Is that a sync issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...

2018-08-22 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22188
  
OK, I reran the tests for the lower column count cases, and the runs with 
the patch consistently show a tiny (1-3%) improvement compared to the master 
branch. So even the lower column count cases benefit a little.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22188: [SPARK-25164][SQL] Avoid rebuilding column and path list...

2018-08-22 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22188
  
Thanks @vanzin. In my benchmark tests, the tiny degradation (0.5%) in the 
lower column count cases is pretty consistent, which concerns me a little. I am 
going to re-run those tests in a different environment and see what happens.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22188: [SPARK-25164][SQL] Avoid rebuilding column and pa...

2018-08-22 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/22188

[SPARK-25164][SQL] Avoid rebuilding column and path list for each column in 
parquet reader

## What changes were proposed in this pull request?

VectorizedParquetRecordReader::initializeInternal rebuilds the column list 
and path list once for each column. Therefore, it indirectly iterates 
2\*colCount\*colCount times for each parquet file.

This inefficiency impacts jobs that read parquet-backed tables with many 
columns and many files. Jobs that read tables with few columns or few files are 
not impacted.

This PR changes initializeInternal so that it builds each list only once.

I ran benchmarks on my laptop with 1 worker thread, running this query:

sql("select * from parquet_backed_table where id1 = 1").collect

There are roughly one matching row for every 425 rows, and the matching 
rows are sprinkled pretty evenly throughout the table (that is, every page for 
column id1 has at least one matching row).

6000 columns, 1 million rows, 67 32M files:

master | branch | improvement
---|-|---
10.87 min | 6.09 min | 44%

6000 columns, 1 million rows, 23 98m files:

master | branch | improvement
---|-|---
7.39 min | 5.80 min | 21%

600 columns 10 million rows, 67 32M files:

master | branch | improvement
---|-|---
1.95 min | 1.96 min | -0.5%

60 columns, 100 million rows, 67 32M files:

master | branch | improvement
---|-|---
0.55 min | 0.55 min | 0%

## How was this patch tested?

- sql unit tests
- pyspark-sql tests



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-25164

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22188


commit 697de21501acbda3dbcd8ccc13a35ad3723a652e
Author: Bruce Robbins 
Date:   2018-08-22T02:00:28Z

Initial commit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...

2018-08-21 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21899#discussion_r211833522
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -118,12 +119,20 @@ case class BroadcastExchangeExec(
   // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
   // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
-throw new SparkFatalException(
+val sizeMessage = if (dataSize != -1) {
+  s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated 
size of the " +
+s"relation ($dataSize bytes)"
--- End diff --

Hmmm.. good question. I will check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...

2018-08-21 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22154
  
Re: Your build failure ('statefulOperators.scala:95: value asJava is not a 
member of scala.collection.immutable.Map[String,Long]).

I am also seeing this in my fork on my laptop (I just updated my fork about 
10-15 minutes ago).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...

2018-08-20 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
@gatorsmile So I should include all the related PRs merged to master as a 
single PR here? Just verifying.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...

2018-08-18 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21899
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...

2018-08-17 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21899#discussion_r211047556
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -118,12 +119,20 @@ case class BroadcastExchangeExec(
   // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
   // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
-throw new SparkFatalException(
+val sizeMessage = if (dataSize != -1) {
+  s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated 
size of the " +
--- End diff --

@hvanhovell That's what was being obscured :).

In testing this, I've seen various places. In the three cases I have seen 
first hand:


java.lang.OutOfMemoryError: Not enough memory to build and broadcast the 
table to all worker nodes. As a workaround, you can either disable broadcast by 
setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver 
memory by setting spark.driver.memory to a higher value.
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:628)
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:570)
  at 
org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:865)

At that line is an allocation:

val newPage = new Array[Long](newNumWords.toInt)

2nd case:

java.lang.OutOfMemoryError: Not enough memory to build and broadcast the 
table to all worker nodes. As a workaround, you can either disable broadcast by 
setting spark.sql.autoBroadcastJoinThreshold to -1 or increase 
spark.driver.memory by at least the estimated size of the relation (96468992 
bytes).
  at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
  at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286)
  at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:286)

3rd case:

java.lang.OutOfMemoryError: Not enough memory to build and broadcast the 
table to all worker nodes. As a workaround, you can either disable broadcast by 
setting \
spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver 
memory by setting spark.driver.memory to a higher value.
  at 
org.apache.spark.unsafe.memory.MemoryBlock.allocateFromObject(MemoryBlock.java:118)
  at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.getUTF8String(UnsafeRow.java:420)
  at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
  at 
org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:311)

At that line is also an allocation:

mb = new ByteArrayMemoryBlock(array, offset, length);





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...

2018-08-17 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21899
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...

2018-08-17 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21899
  
@MaxGekk In the updated message, I left out "hash" from the term "hash 
relation" only because it seems the relation could be also be an Array.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...

2018-08-17 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21950
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...

2018-08-16 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
@jiangxb1987 gentle ping.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...

2018-08-15 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
Once this is merged, I will also back-port:

- [[SPARK-24564][TEST] Add test suite for 
RecordBinaryComparator](https://github.com/apache/spark/commit/5b0596648854c0c733b7c607661b78af7df18b89)
 
- #22101



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...

2018-08-14 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22101
  
Should there be a test, or do other sorting-related tests cover this 
indirectly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Re...

2018-08-13 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
@jiangxb1987 

Here are some of the differences from the original PR
- I also ported the follow up PR #20426
- I ported #20088 (for SPARK-22905) to get the tests to pass. I also ported 
its followup, #20113.
- 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
 line 112
UnsafeExternalSorter.create does not take a Supplier for the 5th argument, 
so I put get() on the argument to directly pass a RecordComparator object.
- 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala:
 I access SQLConf differently (since SQLConf.get doesn't appear to work in 2.2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartit...

2018-08-13 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/22079#discussion_r209736691
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala ---
@@ -144,7 +144,7 @@ object ChiSqSelectorModel extends 
Loader[ChiSqSelectorModel] {
   val dataArray = Array.tabulate(model.selectedFeatures.length) { i =>
 Data(model.selectedFeatures(i))
   }
-  
spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))
+  spark.createDataFrame(sc.makeRDD(dataArray, 
1)).write.parquet(Loader.dataPath(path))
--- End diff --

@dongjoon-hyun Thanks, I will include the other commit and also update the 
title.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...

2018-08-12 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
Hmmm... I somehow managed to break SparkR tests but fixing a comment. It 
seems to have auto-retried and broke the second time too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...

2018-08-12 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...

2018-08-12 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
@jiangxb1987 

> We shall also include #20088 in this backport PR.

I did that shortly after commenting, which allowed the tests to pass. I 
squashed it into the first commit, so it wasn't obvious I did it.

Should I also include #20426 in this PR, or treat that separately?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...

2018-08-11 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/22079
  
The test "model load / save" in ChiSqSelectorSuite fails because of this 
line in 

[ChiSqSelector.scala](https://github.com/apache/spark/blob/branch-2.2/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L147)



spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))


In 2.4, the line is:


spark.createDataFrame(sc.makeRDD(dataArray, 
1)).write.parquet(Loader.dataPath(path))


If you change 2.4 to also have that line, and also remove the follow-up PR 
(#20426) to avoid sorting when there is one partition, this test also fails on 
2.4 in the same way.

So I am not sure which way to go: Update ChiSqSelector.scala to be like 2.4 
(simply a one line change), or make the test accept this new order.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartit...

2018-08-11 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/22079

[SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could 
lead to incorrect answers

## What changes were proposed in this pull request?

Currently shuffle repartition uses RoundRobinPartitioning, the generated 
result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a 
shuffle (which would lead to non-deterministic row ordering), as the pattern 
shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.

The following code returns 931532, instead of 100:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by 
performing a local sort before partitioning, after we make the input row 
ordering deterministic, the function from rows to partitions is fully 
deterministic too.

The downside of the approach is that with extra local sort inserted, the 
performance of repartition() will go down, so we add a new config named 
`spark.sql.execution.sortBeforeRepartition` to control whether this patch is 
applied. The patch is default enabled to be safe-by-default, but user may 
choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that 
leads to a bunch of test cases failure because they are comparing the results 
directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to 
true), the following query returns 100:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 100
```

Author: Xingbo Jiang 

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-23207

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22079.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22079


commit efccc028bce64bf4754ce81ee16533c19b4384b2
Author: Xingbo Jiang 
Date:   2018-01-26T23:01:03Z

[SPARK-23207][SQL] Shuffle+Repartition on a DataFrame could lead to 
incorrect answers

Currently shuffle repartition uses RoundRobinPartitioning, the generated 
result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a 
shuffle (which would lead to non-deterministic row ordering), as the pattern 
shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.

The following code returns 931532, instead of 100:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by 
performing a local sort before partitioning, after we make the input row 
ordering deterministic, the funct

[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...

2018-08-02 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21950
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21950: [SPARK-24914][SQL][WIP] Add configuration to avoid OOM d...

2018-08-02 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21950
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21950: [SPARK-24912][SQL][WIP] Add configuration to avoi...

2018-08-01 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/21950

[SPARK-24912][SQL][WIP] Add configuration to avoid OOM during broadcast 
join (and other negative side effects of incorrect table sizing)

## What changes were proposed in this pull request?

Added configuration settings to help avoid OOM errors during broadcast 
joins.

- deser multiplication factor: Tell Spark to multiply totalSize times a 
specified factor for tables with encoded files (i.e., parquet or orc files). 
Spark will do this when calculating a table's sizeInBytes. This is modelled 
after Hive's hive.stats.deserialization.factor configuration setting.
- ignore rawDataSize: Due to HIVE-20079, rawDataSize is broken. This 
settings tells Spark to ignore rawDataSize when calculating the table's 
sizeInBytes.

One can partially simulate the deser multiplication factor without this 
change by decreasing the value in spark.sql.autoBroadcastJoinThreshold. 
However, that will affect all tables, not just the ones that are encoded.

There is some awkwardness in that the check for file type (parquet or orc) 
uses Hive deser names, but the checks for partitioned tables need to be made 
outside of the Hive submodule. Still working that out.

## How was this patch tested?

Added unit tests.

Also, checked that I can avoid broadcast join OOM errors when using the 
deser multiplication factor on both my laptop and a cluster. Also checked that 
I can avoid OOM errors using the ignore rawDataSize flag on my laptop.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-24914

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21950


commit aa2a957751a906fe538822cace019014e763a8c3
Author: Bruce Robbins 
Date:   2018-07-26T00:36:17Z

WIP version




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...

2018-07-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21899
  
>Is it possible to include the actual size of the in-memory table so far in 
the msg as well?

Only if the relation can be built. If we run out of memory attempting to 
build the relation, we've essentially died in its constructor, so the relation 
doesn't exist.

If we OOM after that (say when attempting to broadcast the relation), then 
we can get the size.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21899: [SPARK-24912][SQL] Don't obscure source of OOM during br...

2018-07-27 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21899
  
> Is it possible to include the actual size of the in-memory table so far 
in the msg as well?

Possibly. The state of the relation might be messy when I go to query its 
size.

>Also, does catching the OOM and throwing our own mess with 
HeapDumpOnOutOfMemoryError?

@squito From my tests, it seems the heap dump is taken before the exception 
is caught.


java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid70644.hprof ...
Heap dump file created [842225516 bytes in 2.412 secs]
java.lang.OutOfMemoryError: Not enough memory to build and broadcast the 
table to all worker nodes. As a workaround, you can either disable broadcast by 
setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver 
memory by setting spark.driver.memory to a higher value
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:628)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...

2018-07-27 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/21899

[SPARK-24912][SQL] Don't obscure source of OOM during broadcast join

## What changes were proposed in this pull request?

This PR shows the stack trace of the original OutOfMemoryError that occurs 
while building or broadcasting a HashedRelation, rather than the stack trace of 
the newly created OutOfMemoryError that's created during error handling.

Currently, when an OOM occurs while broadcasting a table, the stack trace 
shows a line in the error handling:

java.lang.OutOfMemoryError: Not enough memory to build and broadcast the 
table to all worker nodes. As a workaround, you can either disable broadcast by 
setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver 
memory by setting spark.driver.memory to a higher value
  at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:122)
  at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:101)

With this PR, it shows the original stack trace.

java.lang.OutOfMemoryError: Not enough memory to build and broadcast the 
table to all worker nodes. As a workaround, you can either disable broadcast by 
setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver 
memory by setting spark.driver.memory to a higher value
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:628)
  at 
org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:570)
  at 
org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:865)


While sometimes the line on which is the exception is thrown is just a 
victim, sometimes it is a participant in the problem, as was the case in the 
above exception.

## How was this patch tested?

Manually tested case where broadcast join caused an OOM, and a case where 
it did not.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-24912

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21899.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21899


commit ca19596c9c09e2cc0ef5667d84f1d289b8184b91
Author: Bruce Robbins 
Date:   2018-07-26T01:11:07Z

Initial commit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-07-10 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
@ueshin Thanks for all your help!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-07-06 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-07-02 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r199678852
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -551,6 +551,36 @@ object TypeCoercion {
   case None => s
 }
 
+  case m @ MapConcat(children) if children.forall(c => 
MapType.acceptsType(c.dataType)) &&
+!haveSameType(children) =>
+val keyTypes = 
children.map(_.dataType.asInstanceOf[MapType].keyType)
--- End diff --

I don't necessarily have a _good_ reason, but here are two reasons I did 
that extra junk:

1) The Concat-like code didn't find a wider type amongst types 
map and map. So, it just fell to case None => 
m

2) If the call to map_concat has the same child types but multiple 
valueContainsNull values, the Concat-style code added a Cast to each child 
(this is because haveSameType considers expressions with different 
valueContainsNull values to have different types). It does no harm, as far as I 
can tell, but it seemed wrong.

About issue 1): I will debug. I might have done something wrong there. 
Plus, even if it's a real bug in findWiderCommonType, it affects my longer 
code, which may be looking for a wider common type amongst the keys or values, 
which could themselves be maps.

About issue 2): Maybe not an issue. Or I can create an alternate 
haveSameType() function for Maps.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-26 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
Still working on type coercion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21628: [SPARK-23776][DOC] Update instructions for running PySpa...

2018-06-25 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21628
  
@HyukjinKwon Thanks for your help!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-24 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r197671215
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -475,6 +474,231 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "b"], [2 -> "c"], [3 -> "d"]]
+  """, since = "2.4.0")
+case class MapConcat(children: Seq[Expression]) extends Expression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+var funcName = s"function $prettyName"
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"input to $funcName should all be of type map, but it's " +
+  children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
funcName)
+}
+  }
+
+  override def dataType: MapType = {
+val dt = children.map(_.dataType.asInstanceOf[MapType]).headOption
+  .getOrElse(MapType(StringType, StringType))
+val valueContainsNull = children.map(_.dataType.asInstanceOf[MapType])
+  .exists(_.valueContainsNull)
+if (dt.valueContainsNull != valueContainsNull) {
+  dt.copy(valueContainsNull = valueContainsNull)
+} else {
+  dt
+}
+  }
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override def eval(input: InternalRow): Any = {
+val maps = children.map(_.eval(input))
+if (maps.contains(null)) {
+  return null
+}
+val keyArrayDatas = maps.map(_.asInstanceOf[MapData].keyArray())
+val valueArrayDatas = maps.map(_.asInstanceOf[MapData].valueArray())
+
+val numElements = keyArrayDatas.foldLeft(0L)((sum, ad) => sum + 
ad.numElements())
+if (numElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful attempt to concat maps 
with $numElements " +
+s"elements due to exceeding the map size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+val finalKeyArray = new Array[AnyRef](numElements.toInt)
+val finalValueArray = new Array[AnyRef](numElements.toInt)
+var position = 0
+for (i <- keyArrayDatas.indices) {
+  val keyArray = keyArrayDatas(i).toObjectArray(dataType.keyType)
+  val valueArray = valueArrayDatas(i).toObjectArray(dataType.valueType)
+  Array.copy(keyArray, 0, finalKeyArray, position, keyArray.length)
+  Array.copy(valueArray, 0, finalValueArray, position, 
valueArray.length)
+  position += keyArray.length
+}
+
+new ArrayBasedMapData(new GenericArrayData(finalKeyArray),
+  new GenericArrayData(finalValueArray))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val mapCodes = children.map(_.genCode(ctx))
+val keyType = dataType.keyType
+val valueType = dataType.valueType
+val argsName = ctx.freshName("args")
+val keyArgsName = ctx.freshName("keyArgs")
+val valArgsName = ctx.freshName("valArgs")
+
+val mapDataClass = classOf[MapData].getName
+val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName
+val arrayDataClass = classOf[ArrayData].getName
+
+val init =
+  s"""
+|$mapDataClass[] $argsName = new $mapDataClass[${mapCodes.size}];
+|$arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
+|$arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
+|boolean ${ev.isNull} = false;
+|$mapDataClass ${ev.value} = null;
+  """.stripMargin
+
+val assignments = mapCodes.zipWithIndex.map { case (m, i) =>
+  s"""
+ |${m.code}
+ |$argsName[$i] = ${m.value};
+ |if (${m.isNull}) {
+ |  ${ev.isNull} = true;
+ |}
+   """.stripMargin
+}
+
+v

[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-24 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
@ueshin 
>so I was wondering whether we need the same thing for MapConcat or not.

Got it. I will research that, plus I will look at the entire pull request 
for Concat to see if there is anything else relevant to MapConcat.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-24 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r197669221
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -475,6 +474,231 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "b"], [2 -> "c"], [3 -> "d"]]
+  """, since = "2.4.0")
+case class MapConcat(children: Seq[Expression]) extends Expression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+var funcName = s"function $prettyName"
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"input to $funcName should all be of type map, but it's " +
+  children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), 
funcName)
+}
+  }
+
+  override def dataType: MapType = {
+val dt = children.map(_.dataType.asInstanceOf[MapType]).headOption
+  .getOrElse(MapType(StringType, StringType))
+val valueContainsNull = children.map(_.dataType.asInstanceOf[MapType])
+  .exists(_.valueContainsNull)
+if (dt.valueContainsNull != valueContainsNull) {
+  dt.copy(valueContainsNull = valueContainsNull)
+} else {
+  dt
+}
+  }
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override def eval(input: InternalRow): Any = {
+val maps = children.map(_.eval(input))
+if (maps.contains(null)) {
+  return null
+}
+val keyArrayDatas = maps.map(_.asInstanceOf[MapData].keyArray())
+val valueArrayDatas = maps.map(_.asInstanceOf[MapData].valueArray())
+
+val numElements = keyArrayDatas.foldLeft(0L)((sum, ad) => sum + 
ad.numElements())
+if (numElements > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful attempt to concat maps 
with $numElements " +
+s"elements due to exceeding the map size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+val finalKeyArray = new Array[AnyRef](numElements.toInt)
+val finalValueArray = new Array[AnyRef](numElements.toInt)
+var position = 0
+for (i <- keyArrayDatas.indices) {
+  val keyArray = keyArrayDatas(i).toObjectArray(dataType.keyType)
+  val valueArray = valueArrayDatas(i).toObjectArray(dataType.valueType)
+  Array.copy(keyArray, 0, finalKeyArray, position, keyArray.length)
+  Array.copy(valueArray, 0, finalValueArray, position, 
valueArray.length)
+  position += keyArray.length
+}
+
+new ArrayBasedMapData(new GenericArrayData(finalKeyArray),
+  new GenericArrayData(finalValueArray))
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val mapCodes = children.map(_.genCode(ctx))
+val keyType = dataType.keyType
+val valueType = dataType.valueType
+val argsName = ctx.freshName("args")
+val keyArgsName = ctx.freshName("keyArgs")
+val valArgsName = ctx.freshName("valArgs")
+
+val mapDataClass = classOf[MapData].getName
+val arrayBasedMapDataClass = classOf[ArrayBasedMapData].getName
+val arrayDataClass = classOf[ArrayData].getName
+
+val init =
+  s"""
+|$mapDataClass[] $argsName = new $mapDataClass[${mapCodes.size}];
+|$arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
+|$arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
+|boolean ${ev.isNull} = false;
+|$mapDataClass ${ev.value} = null;
+  """.stripMargin
+
+val assignments = mapCodes.zipWithIndex.map { case (m, i) =>
+  s"""
+ |${m.code}
+ |$argsName[$i] = ${m.value};
+ |if (${m.isNull}) {
+ |  ${ev.isNull} = true;
+ |}
+   """.stripMargin
+}
+
+va

[GitHub] spark pull request #21628: [SPARK-23776][DOC] Update instructions for runnin...

2018-06-24 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21628#discussion_r197667457
  
--- Diff: docs/building-spark.md ---
@@ -215,19 +215,23 @@ If you are building Spark for use in a Python 
environment and you wish to pip in
 
 Alternatively, you can also run make-distribution with the --pip option.
 
-## PySpark Tests with Maven
+## PySpark Tests with Maven or SBT
 
 If you are building PySpark and wish to run the PySpark tests you will 
need to build Spark with Hive support.
 
 ./build/mvn -DskipTests clean package -Phive
 ./python/run-tests
 
+If you are building PySpark with SBT and wish to run the PySpark tests, 
you will need to build Spark with Hive support and also build the test 
components:
+
+./build/sbt -Phive clean package
--- End diff --

I noticed that the pyspark tests were recently changed so that -Phive is no 
longer strictly necessary to run pyspark tests, but I decided not to address 
that in this update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21628: [SPARK-23776][DOC] Update instructions for runnin...

2018-06-24 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/21628

[SPARK-23776][DOC] Update instructions for running PySpark after building 
with SBT

## What changes were proposed in this pull request?

This update tells the reader how to build Spark with SBT such that 
pyspark-sql tests will succeed.

If you follow the current instructions for building Spark with SBT, 
pyspark/sql/udf.py fails with:

AnalysisException: u'Can not load class 
test.org.apache.spark.sql.JavaStringLength, please make sure it is on the 
classpath;'


## How was this patch tested?

I ran the doc build command (SKIP_API=1 jekyll build) and eyeballed the 
result.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark SPARK-23776_doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21628.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21628


commit 9fcd05d7cb52a68bea930625605013397b4989f6
Author: Bruce Robbins 
Date:   2018-06-25T02:07:12Z

Update build doc for running pyspark after building with sbt




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21621: [SPARK-24633][SQL] Fix codegen when split is requ...

2018-06-24 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21621#discussion_r197646450
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -556,6 +556,17 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 checkAnswer(df8.selectExpr("arrays_zip(v1, v2)"), expectedValue8)
   }
 
+  test("SPARK-24633: arrays_zip splits input processing correctly") {
+Seq("true", "false").foreach { wholestageCodegenEnabled =>
+  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
wholestageCodegenEnabled) {
+val df = spark.range(1)
+val exprs = (0 to 5).map(x => array($"id" + lit(x)))
--- End diff --

@mgaido91 Got it, you were testing that it does not split when 
wholestagecodegen is enabled.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21621: [SPARK-24633][SQL] Fix codegen when split is requ...

2018-06-23 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21621#discussion_r197623576
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -556,6 +556,17 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 checkAnswer(df8.selectExpr("arrays_zip(v1, v2)"), expectedValue8)
   }
 
+  test("SPARK-24633: arrays_zip splits input processing correctly") {
+Seq("true", "false").foreach { wholestageCodegenEnabled =>
+  withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
wholestageCodegenEnabled) {
+val df = spark.range(1)
+val exprs = (0 to 5).map(x => array($"id" + lit(x)))
--- End diff --

This wasn't splitting input processing for me. Maybe 
splitExpressionsWithCurrentInputs has a bigger threshold than splitExpressions.

Even at 90, it still had not split the input processing. At 100, it finally 
did. So someplace between 90 and 100, it starts splitting.

I might be looking at the wrong thing. Check at your end.


val exprs = (0 to 100).map(x => array($"id" + lit(x)))
checkAnswer(df.select(arrays_zip(exprs: _*)),
  Row(Seq(Row(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 
37, 38, 39, 40, 41,
42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 
58, 59, 60, 61, 62,
63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 
79, 80, 81, 82, 83,
84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 
100




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-22 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
Hi @ueshin. Just a question while I work on the changes for your review 
comments.
>I'm wondering whether we need type coercion like concat for array type is 
doing.

Which type coercion in Concat are you referring to? Are you referring to 
the check for primitive vs object type in code generation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20909: [SPARK-23776][python][test] Check for needed components/...

2018-06-09 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/20909
  
@HyukjinKwon This PR is mostly obsolete. I will close it and re-open 
something smaller... maybe a one-line documentation change to handle the 
missing UDF case for those who build with sbt.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20909: [SPARK-23776][python][test] Check for needed comp...

2018-06-09 Thread bersprockets
Github user bersprockets closed the pull request at:

https://github.com/apache/spark/pull/20909


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...

2018-06-08 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21231
  
@hvanhovell @maropu @viirya @kiszk Thanks for all the help!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-06-05 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r193280073
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -308,6 +308,170 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
--- End diff --

@ueshin 

>We don't need to care about key duplication like CreateMap for now.

Just verifying: This means I should simply concatenate the maps, possibly 
creating additional duplicates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...

2018-05-30 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21231
  
ping @hvanhovell


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21308: SPARK-24253: Add DeleteSupport mix-in for DataSou...

2018-05-25 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21308#discussion_r190963247
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+
+/**
+ * A mix-in interface for {@link DataSourceV2} delete support. Data 
sources can implement this
+ * interface to provide the ability to delete data from tables that 
matches filter expressions.
+ * 
+ * Data sources must implement this interface to support logical 
operations that combine writing
+ * data with deleting data, like overwriting partitions.
+ */
+public interface DeleteSupport extends DataSourceV2 {
+  /**
+   * Delete data from a data source table that matches filter expressions.
+   * 
+   * Rows are deleted from the data source iff all of the filter 
expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed 
together.
+   * 
+   * Implementations may reject a delete operation if the delete isn't 
possible without significant
+   * effort. For example, partitioned data sources may reject deletes that 
do not filter by
+   * partition columns because the filter may require rewriting files 
without deleted records.
+   * To reject a delete implementations should throw {@link 
IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   * 
+   * Implementations may throw {@link UnsupportedOperationException} if 
the delete operation is not
+   * supported because one of the filter expressions is not supported. 
Implementations should throw
+   * this exception with a clear error message that identifies the 
unsupported expression.
+   *
+   * @param filters filter expressions, used to select rows to delete when 
all expressions match
+   * @throws UnsupportedOperationException If one or more filter 
expressions is not supported
+   * @throws IllegalArgumentException If the delete is rejected due to 
required effort
+   */
+  void deleteWhere(Expression[] filters);
--- End diff --

>Do you think it would be more clear if this were explicitly a driver-side 
operation?

Possibly. Maybe in the big data world this is already obvious. To me, it 
looks like a general purpose delete. Maybe deletePartitions? (I am bad at 
naming things, however).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-18 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-17 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r189161277
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 ---
@@ -56,6 +58,28 @@ class CollectionExpressionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 checkEvaluation(MapValues(m2), null)
   }
 
+  test("Map Concat") {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...

2018-05-17 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21231
  
@maropu @hvanhovell @viirya Are all pending issues resolved?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

2018-05-17 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21305#discussion_r188960491
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -344,6 +344,36 @@ case class Join(
   }
 }
 
+/**
+ * Append data to an existing DataSourceV2 table.
+ */
+case class AppendData(
--- End diff --

How does this logical plan node map to the 8 operations outlined in your 
SPIP?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21308: SPARK-24253: Add DeleteSupport mix-in for DataSou...

2018-05-15 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21308#discussion_r188392219
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/DeleteSupport.java ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.catalyst.expressions.Expression;
+
+/**
+ * A mix-in interface for {@link DataSourceV2} delete support. Data 
sources can implement this
+ * interface to provide the ability to delete data from tables that 
matches filter expressions.
+ * 
+ * Data sources must implement this interface to support logical 
operations that combine writing
+ * data with deleting data, like overwriting partitions.
+ */
+public interface DeleteSupport extends DataSourceV2 {
+  /**
+   * Delete data from a data source table that matches filter expressions.
+   * 
+   * Rows are deleted from the data source iff all of the filter 
expressions match. That is, the
+   * expressions must be interpreted as a set of filters that are ANDed 
together.
+   * 
+   * Implementations may reject a delete operation if the delete isn't 
possible without significant
+   * effort. For example, partitioned data sources may reject deletes that 
do not filter by
+   * partition columns because the filter may require rewriting files 
without deleted records.
+   * To reject a delete implementations should throw {@link 
IllegalArgumentException} with a clear
+   * error message that identifies which expression was rejected.
+   * 
+   * Implementations may throw {@link UnsupportedOperationException} if 
the delete operation is not
+   * supported because one of the filter expressions is not supported. 
Implementations should throw
+   * this exception with a clear error message that identifies the 
unsupported expression.
+   *
+   * @param filters filter expressions, used to select rows to delete when 
all expressions match
+   * @throws UnsupportedOperationException If one or more filter 
expressions is not supported
+   * @throws IllegalArgumentException If the delete is rejected due to 
required effort
+   */
+  void deleteWhere(Expression[] filters);
--- End diff --

Does putting the delete method here (as opposed to say, in DataDeleters on 
some other thing parallel to to the DataWriters) imply that this is a 
driver-side operation only? I understand the use case is deleting partitions 
which is usually only a file system operation, but will that always be the case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21231: [SPARK-24119][SQL]Add interpreted execution to So...

2018-05-09 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21231#discussion_r187192485
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 ---
@@ -147,7 +148,40 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
   (!child.isAscending && child.nullOrdering == NullsLast)
   }
 
-  override def eval(input: InternalRow): Any = throw new 
UnsupportedOperationException
+  private lazy val calcPrefix: Any => Long = child.child.dataType match {
+case BooleanType => (raw) =>
+  if (raw.asInstanceOf[Boolean]) 1 else 0
+case DateType | TimestampType | _: IntegralType => (raw) =>
--- End diff --

I'm a little apprehensive about changing existing and properly functioning 
code (in doGenCode), even though it would make it slightly more readable. If 
you think I should, though, I will.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...

2018-05-09 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21144
  
@cloud-fan I don't think this is an issue in 2.3. It would be an issue only 
once [SPARK-23580](https://issues.apache.org/jira/browse/SPARK-23580) 
("Interpreted mode fallback should be implemented") is completed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...

2018-05-07 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21144
  
Thanks much!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-07 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
@ueshin Hopefully I have addressed all of your review comments.

Also, I have a question about what it means to dedup across maps when Spark 
allows duplicates in maps 
[here.](https://issues.apache.org/jira/browse/SPARK-23936?focusedCommentId=16464245&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16464245)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-07 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r186570491
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +117,169 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """, since = "2.4.0")
+case class MapConcat(children: Seq[Expression]) extends Expression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// check key types and value types separately to allow 
valueContainsNull to vary
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if (children.map(_.dataType.asInstanceOf[MapType].keyType)
+  .exists(_.isInstanceOf[MapType])) {
+  // map_concat needs to pick a winner when multiple maps contain the 
same key. map_concat
+  // can do that only if it can detect when two keys are the same. 
SPARK-9415 states "map type
+  // should not support equality, hash". As a result, map_concat does 
not support a map type
+  // as a key
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName cannot have a map 
type as a key")
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].keyType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].valueType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+MapType(
+  keyType = children.headOption
+
.map(_.dataType.asInstanceOf[MapType].keyType).getOrElse(StringType),
+  valueType = children.headOption
+
.map(_.dataType.asInstanceOf[MapType].valueType).getOrElse(StringType),
+  valueContainsNull = children.map { c =>
+c.dataType.asInstanceOf[MapType]
+  }.exists(_.valueContainsNull)
+)
+  }
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
+}
+val (keyArray, valueArray) = union.entrySet().toArray().map { e =>
--- End diff --

I would imagine bad things would happen before you got this far (even Map's 
size method returns an Int).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...

2018-05-07 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21144
  
@hvanhovell @maropu Is there anything on this PR that I should do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21231: [SPARK-24119][SQL]Add interpreted execution to So...

2018-05-06 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21231#discussion_r186307490
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SortOrderExpressionsSuite.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.sql.{Date, Timestamp}
+import java.util.TimeZone
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparators._
+
+class SortOrderExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+
+  test("SortPrefix") {
+// Explicitly choose a time zone, since Date objects can create 
different values depending on
+// local time zone of the machine on which the test is running
+TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
--- End diff --

@kiszk Maybe not a nit. I should fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...

2018-05-06 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21231
  
@maropu @kiszk Hopefully I've addressed all comments. Please take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...

2018-05-05 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21231
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21231: [SPARK-24119][SQL]Add interpreted execution to SortPrefi...

2018-05-03 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21231
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21231: [SPARK-24119][SQL]Add interpreted execution to So...

2018-05-03 Thread bersprockets
GitHub user bersprockets opened a pull request:

https://github.com/apache/spark/pull/21231

[SPARK-24119][SQL]Add interpreted execution to SortPrefix expression

## What changes were proposed in this pull request?

Implemented eval in SortPrefix expression.

## How was this patch tested?

- ran existing sbt SQL tests
- added unit test
- ran existing Python SQL tests
- manual tests: disabling codegen -- patching code to disable beyond what 
spark.sql.codegen.wholeStage=false can do -- and running sbt SQL tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bersprockets/spark sortprefixeval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21231.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21231


commit 280b941a1adc2ffcd82810a69f3d9e475607b70a
Author: Bruce Robbins 
Date:   2018-04-28T23:26:48Z

Checkpoint changes

commit 4a810ae8776893ba1d12e620930473fdd61d1f49
Author: Bruce Robbins 
Date:   2018-04-29T18:25:26Z

Checkpoint commit

commit d1a7e220040ba1cfe4facb7d6eef9ae1251768aa
Author: Bruce Robbins 
Date:   2018-04-29T19:19:37Z

Checkpoint commit

commit 4dbb0b7ae959a6a4f85122a887bab4e1563255f0
Author: Bruce Robbins 
Date:   2018-04-29T22:33:21Z

Comment on testing oddity

commit 227b6ac71bcfe6a54051f47dd16aec047b0a98d9
Author: Bruce Robbins 
Date:   2018-04-30T21:22:09Z

Add boolean test for Sortprefix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21169: [SPARK-23715][SQL] the input of to/from_utc_timestamp ca...

2018-05-02 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21169
  
Addresses all of my comments, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/21073#discussion_r185392954
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -116,6 +117,161 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns the union of all the given maps.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(map, ...) - Returns the union of all the given maps",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd'));
+   [[1 -> "a"], [2 -> "c"], [3 -> "d"]
+  """, since = "2.4.0")
+case class MapConcat(children: Seq[Expression]) extends Expression {
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+// check key types and value types separately to allow 
valueContainsNull to vary
+if (children.exists(!_.dataType.isInstanceOf[MapType])) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input of function $prettyName should all be of type 
map, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].keyType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else if 
(children.map(_.dataType.asInstanceOf[MapType].valueType).distinct.length > 1) {
+  TypeCheckResult.TypeCheckFailure(
+s"The given input maps of function $prettyName should all be the 
same type, " +
+  "but they are " + 
children.map(_.dataType.simpleString).mkString("[", ", ", "]"))
+} else {
+  TypeCheckResult.TypeCheckSuccess
+}
+  }
+
+  override def dataType: MapType = {
+MapType(
+  keyType = children.headOption
+
.map(_.dataType.asInstanceOf[MapType].keyType).getOrElse(StringType),
+  valueType = children.headOption
+
.map(_.dataType.asInstanceOf[MapType].valueType).getOrElse(StringType),
+  valueContainsNull = children.map { c =>
+c.dataType.asInstanceOf[MapType]
+  }.exists(_.valueContainsNull)
+)
+  }
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override def eval(input: InternalRow): Any = {
+val union = new util.LinkedHashMap[Any, Any]()
+children.map(_.eval(input)).foreach { raw =>
+  if (raw == null) {
+return null
+  }
+  val map = raw.asInstanceOf[MapData]
+  map.foreach(dataType.keyType, dataType.valueType, (k, v) =>
+union.put(k, v)
+  )
--- End diff --

I found an issue. I was preparing to add some more tests when I noticed 
that using maps as keys doesn't work well in interpreted mode (seems to work 
fine in codegen mode, so far).

So, something like this doesn't work in interpreted mode:

scala> dfmapmap.show(truncate=false)

+--+-+
|mapmap1   |mapmap2 
 |

+--+-+
|[[1 -> 2, 3 -> 4] -> 101, [5 -> 6, 7 -> 8] -> 102]|[[11 -> 12] -> 103, [1 
-> 2, 3 -> 4] -> 1001]|

+--+-+
scala> dfmapmap.select(map_concat('mapmap1, 
'mapmap2).as('mapmap3)).show(truncate=false)

+---+
|mapmap3
|

+---+
|[[1 -> 2, 3 -> 4] -> 101, [5 -> 6, 7 -> 8] -> 102, [11 -> 12] -> 103, [1 
-> 2, 3 -> 4] -> 1001]|

+---+

As you can see, the key `[1 -> 2, 3 -> 4]` shows up twice in the new map.

This is because:

  val a1 = new ArrayBasedMapData

[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
A test failed with "./bin/spark-submit ... No such file or directory"

Seems like there's lots of spurious test failures right now. I will hold 
off on re-running for a little while.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...

2018-05-01 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21144
  
@hvanhovell @maropu As it turns out, there are at least two places where an 
InterpretedPredicate is created but never initialized: 
SimpleTextSource.buildReader, and ExternalCatalogUtils.prunePartitionsByFilter.

It seems unlikely that nondeterministic expressions would be used for 
partitions, but maybe I am not imaginative enough. Also, these seem like bugs 
in those two classes, not in InterpretedPredicate.

Also, I reran the mostly-interpreted SQL unit tests, this time with 
SortPrefix implemented, and got the error count down from 1270 to 114. None of 
the errors were 'requirement failed' exceptions (so no uninitialized 
nondeterministic expressions).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-05-01 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21141: [SPARK-23853][PYSPARK][TEST] Run Hive-related PySpark te...

2018-04-30 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21141
  
My experience here is limited. Still, it also looks good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-30 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat

2018-04-30 Thread bersprockets
Github user bersprockets commented on the issue:

https://github.com/apache/spark/pull/21073
  
@mn-mikke @kiszk Thanks for the review. I addressed the comments. Please 
take a look when you have a chance.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >